streamly
streamly copied to clipboard
concatMapWith, zip style operations and non-empty streams
@tymefighter asked a question about zipping a container of streams which prompted me to think about this problem. When zipping streams in a stream of streams, the following two formulations of a stream i.e. container1 and container2 do not produce identical behavior:
stream = S.fromList [1,2]
container1 = S.yield stream <> S.nil
container2 = S.yield stream
If we fold the container using concatMapWith (S.zipWith (+)) we will get different results for these:
> S.toList (S.concatMapWith (S.zipWith (+)) id container1) >>= print
[]
> S.toList (S.concatMapWith (S.zipWith (+)) id container2) >>= print
[1,2]
In fact, if a stream is constructed with S.nil at the end the zip operation will always result in an empty stream because we end up zipping all the streams inside the container with an empty strem. However, container2 is different S.yield constructs the stream using the single continuation, and concatMapWith takes the last stream to zip as the last element in the container instead of an empty stream, so it always results in expected result wrt zip operation.
It may be easier to understand this if we look at the implementation of concatMapWith, which is a flipped bindWith:
bindWith
:: IsStream t
=> (t m b -> t m b -> t m b)
-> t m a
-> (a -> t m b)
-> t m b
bindWith par m1 f = go m1
where
go m =
mkStream $ \st yld sng stp ->
let foldShared = foldStreamShared st yld sng stp
single a = foldShared $ unShare (f a)
yieldk a r = foldShared $ unShare (f a) `par` go r
in foldStream (adaptState st) yieldk single stp m
the go r at the end would be an empty stream when stream is formulated with a stop continuation and it would be non-empty if formulated with a single continuation. The S.yield operation uses the single continuation to create a stream.
This is a peculiar issue as concatMapWith operation's behavior is different for essentially an equivalent stream. I can't think of a good solution to this. One way would be to always terminate the streams with nil i.e. remove the single continuation. That way we will always end up with a consistent behavior wrt zip. But then how do we implement the desired concatMapWith behavior for zips? For that we would need streams with a single continuation but no stop continuation, basically a representation for non-empty streams. It may be a good idea to have a non-empty stream module?
Also note that currently we do not have an equivalent of single continuation in the direct representation of streams (i.e. StreamD module). So when streams get converted using rewrite rules from one representation to another (CPS and direct style) we may lose the single continuation based representation.
Another potential way to handle it is to always combine streams in such a way so that if the stream is non-empty it is guaranteed to use the single continuation in the end. This will require keeping the last element buffered until we can determine whether there is no next value in the stream i.e. the next is stop.
Is this only for functions that inspect their elements. I mean with the old type signature of concatMapWith is there an equivalent example?
The problem is specific to zipWith, there won't be a problem with mergeBy. zipWith takes into account the length of all streams being zipped, and add empty stream as one of those nullifying all others. I do not know if a similar issue can occur in any other context. But there are other APIs where it exists, these are special cases of concatMapWith itself:
> foldWith (zipWith (+)) [S.fromList [1,2] :: SerialT Identity Int, S.fromList[1,2]]
fromList []