fs2
fs2 copied to clipboard
Some pipes occasionally get ahead when broadcasting through multiple pipes.
Some pipes occasionally get ahead by 1 element when broadcasting an un-chunked stream through multiple pipes using broadcastThrough.
A reproduction of the issue, using Scala 2.13.6 and fs2 3.1.1, can be found in this project or on scastie.
As explained in the chat, this is due to the fact that publishing to a Topic (which backs broadcastThrough) is not atomic, but rather it enqueues the same elements to a bunch of bounded channel, so it's true that publish will block when one of the channels is full (which is why one or more subs can't go ahead by more than 1), but it might have published to some of the subscribers in the meantime (which is why they can go ahead by 1).
Making the publishing atomic should be possible, but it isn't trivial, especially without avoiding too much contention (like in the PubSub topic, which had performance issues)
@SystemFw In that case, we may want to correct the current documentation of the broadcastThrough method.
The pipes are all run concurrently with each other, but note that elements are pulled from the source as chunks, and the next chunk is pulled only when all pipes are done with processing the current chunk, which prevents faster pipes from getting too far ahead.
The phrase "only when all pipes are done", to me would indicate "not until after all pipes are done processing the current chunk". However, that does not match the operation as described in your comment.
For me, as an end user, there's an argument for two flavours of broadcastThrough to exist, e.g. a raceThrough for cases where performance is key and a coherentlyThrough where control trumps performance and it's necessary to process one chunk/element at a time. I can think of use cases for both semantics.
At present the documentation describes the coherentlyThrough flavour. In the short term, either documenting the bug or describing the current implementation accurately would be useful.
However, longer term, I would like to see the current implementation - which is somewhere in between and is probably not the semantics anyone would reasonably want - bifurcate into the two forms described above.
What do you mean by raceThrough? If you mean something that relaxes backpressure so that pipes can run ahead, you can do it by using prefetch on the pipes, you don't need a different version of broadcastThrough
Sorry, yes, that's what I meant, no constraints on faster pipes running ahead by any number of elements. If that's already achievable via prefetch then great. Does that mean broadcastThrough & prefetch could be used in combination to control exactly how many elements/chunks a pipe could get ahead, from zero to unlimited? Zero is what I meant by "coherently" and unlimited is what I meant by "race".
If there is no need to constrain the progress, which is to say if there is no need to coordinate the progress of the pipes, then perhaps it would be enough to just dump them all in horde?
~Maybe, the following code snippet would achieve that... but I am not sure if you can just pass the same inputStream to many pipes, and expect them to get same outputs...~
// Stream(pipes).map(_.apply(inputStream)).parJoinUnbounded
Edit: Here is the code of broadCastThrough at present: https://github.com/typelevel/fs2/blob/main/core/shared/src/main/scala/fs2/Stream.scala#L224-L251. If you want a simplified version, that does not add any coordination, maybe you could just modify the hardcoded line topic.subscribeAwait(1) and change it for a higher value. You could turn it into MaxInt, to remove restrictions, but that would mean that you can accummulate a lot of elements in there.
Note the use of parJoinUnbounded, instead of parJoin(n), because the parJoin(n) combinator has a limitation: it does not start to pull from (the result stream of) pipe n+1 until after at least one of the pipes between 1 and n has ended. With parJoinUnbounded, each pipe is run concurrently in a separate fiber, and made to pump its results to a share collector queue. However, this combinator alone makes your program to be influenced by the underlying "effect". You have any guarantee from the underlying scheduler of your effects that those fibers, the fibers from each pipe, are given equal or fair time, and it (the scheduler) may instead be designed to let one fiber, with one pipe, race too far ahead.
~This is where the broadcastThrough operator adds a Topic, as a buffer, to control and coordinate the progress of the pipes. Part of the reason to do so may be the requirements of the domain, for keeping some "fairness" between the pipes. E.g. certain broadcasting services may require you not to skew in favour of some subscribers over others. Or simply to dettach its timing semantics from the underlying effect...~
@SystemFw As an approximation, perhaps we could allow to configure the number of "look-ahead" elements in the topic? There is a "1" hardcoded in the source code of broadcastThrough
As an approximation, perhaps we could allow to configure the number of "look-ahead" elements in the topic? There is a "1" hardcoded in the source code of broadcastThrough
@diesalbla you can do this with prefetch though, without having to change anything.
from zero to unlimited?
@mn98 you can't go to zero but you can increase it from 1 (which is the default) to whatever you want, including unlimited. However, remember that there is a reason for that backpressure, i.e. not accumulate too many elements in memory. If you prefetch and the pipe is slow, your memory will start increasing as elements get accumulated in the Channel that backs prefetch
Great, makes sense, thanks very much both for sharing your thoughts on this! In conclusion I suppose it just remains for this issue to be addressed at some point.
Opened #2621 to, if not correct the behaviour and add the new variant, at least make the documentation a bit less inaccurate. @mn98 would that be more informative?
Opened #2621 to, if not correct the behaviour and add the new variant, at least make the documentation a bit less inaccurate. @mn98 would that be more informative?
Changing the documentation of broadcastThrough would certainly help clarify the existing behaviour. However, I'd see this as also changing the existing semantics of the method, turning this issue from a 'bug' into a 'new feature'. If we subsequently add the 'new feature' as part of broadcastThrough, we'll have changed the semantics back to the original form again. Wouldn't users of the library find that surprising?