node
node copied to clipboard
stream: fix `flatMap` concurrency
Fixes: https://github.com/nodejs/node/issues/52796
this PR causes flatMap
to iterate over the stream and mapped iterators in round-robin. I am not sure if this is the best approach but also not sure if there is a better one.
I am also not exactly sure how to test this
I'd love to hear opinions from @nodejs/streams folks.
Review requested:
- [ ] @nodejs/streams
Did you check if the stage 2 proposal requires one way or another? I think it's quite important that our implementation matches the proposal.
When prototyping together we also explored two other ways to schedule the async iterators and this was nicest.
My main concern is this complicating the regular flow for a single operator but I think itβs probably worth it as a bug fix and for correctness
Did you check if the stage 2 proposal requires one way or another? I think it's quite important that our implementation matches the proposal.
the section about concurrency doesn't mention much about implementation
There are many problems here. I think we need a different approach.
can you please elaborate? are your concerns regarding the approach or the implementation?
Maybe start with making some better tests?
I have to think a little what tests we can add, do you have any suggestion?
Also round robin is probably wrong since it won't enforce the order. We would need to add some other operator which does not enfore order.
Also round robin is probably wrong since it won't enforce the order. We would need to add some other operator which does not enfore order.
Order is not enforced anyway with concurrency/flatMap? - round robin had the best fairness out of the attempts we've had (prioritize stream and only then read iterator or read whole iterator (which we do today).
This isn't the implementation we started with, I think it can be improved significantly but there is no way around polling the inner flatMapped iterables and the input stream interchangeably?
(though it may make sense to enforce the order in flatMap with concurrency=1)
Iterators are strictly pull-based, and the default (possibly only) behavior when pulling twice from x.flatMap(mapper)
will be to pull from x
once, then from the first result of mapper
twice (concurrently). It won't pull from the underlying iterator again until the first yielded iterator is exhausted, no matter how many times you pull. (But, in many cases you can exhaust the iterator without actually needing to finish work on it first - consider pulling four times from [0, 1, 2].values().toAsync().map(slowFn)
. The fourth promise can complete with { done: true }
even while slowFn
is still running.)
If I understand correctly, that matches the current behavior of stream's flatMap
, modulo differences between an explicit concurrency
parameter vs just pulling concurrently from the result. I'm hoping we can avoid having a concurrency parameter to flatMap
for async iterators - you can always just do .map(mapper).buffered(N).flatMap(x => x)
if you really want concurrency of the mapper function itself (or of the underlying iterator), where buffered
is a helper which pulls multiple times. But these semantics aren't entirely worked out yet. (And unfortunately it's less obvious how to avoid such a parameter for consuming operators like forEach
.)
Note that there are a few different things you might mean by "flattening" in the context of flatMap
. There's a bunch of discussion of this in various places, but https://github.com/ReactiveX/rxjs/discussions/7429 is a place to start with some recent discussion. Not all of it applies to pull-based rather than push-based systems, but much of it does.
(Feel free to ping me for any questions about async iterator helpers. The proposal is decidedly not finished, so what's written in the repo is not going to be very helpful.)
Order is not enforced anyway with concurrency/flatMap? - round robin had the best fairness out of the attempts we've had (prioritize stream and only then read iterator or read whole iterator (which we do today).
Order is enforced. Not sure why you think otherwise?
What is the "logical" order of flatMap with concurrency?
The only non-streaming alternative with concurrency is we buffer the intermediate iterators?
What is the "logical" order of flatMap with concurrency?
Items exit in the same order they arrive.
@ronag yes but items themselves are iterables of multiple items. If we want them to come in the same sub order we need to buffer all the sub iterations (without making them available to the consumer) while the user is waiting for the first iterable to complete.
What behavior would you expect (unordered or ordered but buffers a lot)? Do you agree that the current behvior is kind of useless (w.r.t concurrency) when yielding async iterables? (the example in https://github.com/nodejs/node/issues/52796 )
It is useless but correct IMHO
It flattens everything and applies concurrency on the flat stream of entries.
You can do an inner flat map on each stream or buffering to achieve concurrency.
It flattens everything and applies concurrency on the flat stream of entries.
that is not the current behavior. it currently applies concurrency on getting/creating an iterator from the mapping function wich is synchronous in the case of async iterators
@ronag so you would prefer we buffer the inner async iterables and yield them "in order" from flatMap? In terms of scheduling the sub-tasks do you have a better idea than round robin?
Not round robin. Depth first.
@ronag how would that work with concurrency and @MoLow 's async generator case?
@ronag how would that work with concurrency and @MoLow 's async generator case?
Flatten and apply concurrency on the flat stream of events? Basically concatMap followed by mergeMap