node icon indicating copy to clipboard operation
node copied to clipboard

stream: fix `flatMap` concurrency

Open MoLow opened this issue 10 months ago β€’ 20 comments

Fixes: https://github.com/nodejs/node/issues/52796 this PR causes flatMap to iterate over the stream and mapped iterators in round-robin. I am not sure if this is the best approach but also not sure if there is a better one. I am also not exactly sure how to test this I'd love to hear opinions from @nodejs/streams folks.

MoLow avatar May 03 '24 09:05 MoLow

Review requested:

  • [ ] @nodejs/streams

nodejs-github-bot avatar May 03 '24 09:05 nodejs-github-bot

Did you check if the stage 2 proposal requires one way or another? I think it's quite important that our implementation matches the proposal.

aduh95 avatar May 03 '24 09:05 aduh95

When prototyping together we also explored two other ways to schedule the async iterators and this was nicest.

My main concern is this complicating the regular flow for a single operator but I think it’s probably worth it as a bug fix and for correctness

benjamingr avatar May 03 '24 09:05 benjamingr

Did you check if the stage 2 proposal requires one way or another? I think it's quite important that our implementation matches the proposal.

the section about concurrency doesn't mention much about implementation

There are many problems here. I think we need a different approach.

can you please elaborate? are your concerns regarding the approach or the implementation?

Maybe start with making some better tests?

I have to think a little what tests we can add, do you have any suggestion?

MoLow avatar May 03 '24 10:05 MoLow

Also round robin is probably wrong since it won't enforce the order. We would need to add some other operator which does not enfore order.

ronag avatar May 03 '24 10:05 ronag

Also round robin is probably wrong since it won't enforce the order. We would need to add some other operator which does not enfore order.

Order is not enforced anyway with concurrency/flatMap? - round robin had the best fairness out of the attempts we've had (prioritize stream and only then read iterator or read whole iterator (which we do today).

This isn't the implementation we started with, I think it can be improved significantly but there is no way around polling the inner flatMapped iterables and the input stream interchangeably?

(though it may make sense to enforce the order in flatMap with concurrency=1)

benjamingr avatar May 03 '24 16:05 benjamingr

Iterators are strictly pull-based, and the default (possibly only) behavior when pulling twice from x.flatMap(mapper) will be to pull from x once, then from the first result of mapper twice (concurrently). It won't pull from the underlying iterator again until the first yielded iterator is exhausted, no matter how many times you pull. (But, in many cases you can exhaust the iterator without actually needing to finish work on it first - consider pulling four times from [0, 1, 2].values().toAsync().map(slowFn). The fourth promise can complete with { done: true } even while slowFn is still running.)

If I understand correctly, that matches the current behavior of stream's flatMap, modulo differences between an explicit concurrency parameter vs just pulling concurrently from the result. I'm hoping we can avoid having a concurrency parameter to flatMap for async iterators - you can always just do .map(mapper).buffered(N).flatMap(x => x) if you really want concurrency of the mapper function itself (or of the underlying iterator), where buffered is a helper which pulls multiple times. But these semantics aren't entirely worked out yet. (And unfortunately it's less obvious how to avoid such a parameter for consuming operators like forEach.)

Note that there are a few different things you might mean by "flattening" in the context of flatMap. There's a bunch of discussion of this in various places, but https://github.com/ReactiveX/rxjs/discussions/7429 is a place to start with some recent discussion. Not all of it applies to pull-based rather than push-based systems, but much of it does.

(Feel free to ping me for any questions about async iterator helpers. The proposal is decidedly not finished, so what's written in the repo is not going to be very helpful.)

bakkot avatar May 04 '24 20:05 bakkot

Order is not enforced anyway with concurrency/flatMap? - round robin had the best fairness out of the attempts we've had (prioritize stream and only then read iterator or read whole iterator (which we do today).

Order is enforced. Not sure why you think otherwise?

ronag avatar May 05 '24 07:05 ronag

What is the "logical" order of flatMap with concurrency?

benjamingr avatar May 05 '24 09:05 benjamingr

The only non-streaming alternative with concurrency is we buffer the intermediate iterators?

benjamingr avatar May 05 '24 10:05 benjamingr

What is the "logical" order of flatMap with concurrency?

Items exit in the same order they arrive.

ronag avatar May 05 '24 13:05 ronag

@ronag yes but items themselves are iterables of multiple items. If we want them to come in the same sub order we need to buffer all the sub iterations (without making them available to the consumer) while the user is waiting for the first iterable to complete.

What behavior would you expect (unordered or ordered but buffers a lot)? Do you agree that the current behvior is kind of useless (w.r.t concurrency) when yielding async iterables? (the example in https://github.com/nodejs/node/issues/52796 )

benjamingr avatar May 05 '24 13:05 benjamingr

It is useless but correct IMHO

ronag avatar May 05 '24 13:05 ronag

It flattens everything and applies concurrency on the flat stream of entries.

ronag avatar May 05 '24 13:05 ronag

You can do an inner flat map on each stream or buffering to achieve concurrency.

ronag avatar May 05 '24 13:05 ronag

It flattens everything and applies concurrency on the flat stream of entries.

that is not the current behavior. it currently applies concurrency on getting/creating an iterator from the mapping function wich is synchronous in the case of async iterators

MoLow avatar May 05 '24 13:05 MoLow

@ronag so you would prefer we buffer the inner async iterables and yield them "in order" from flatMap? In terms of scheduling the sub-tasks do you have a better idea than round robin?

benjamingr avatar May 05 '24 13:05 benjamingr

Not round robin. Depth first.

ronag avatar May 05 '24 13:05 ronag

@ronag how would that work with concurrency and @MoLow 's async generator case?

benjamingr avatar May 05 '24 15:05 benjamingr

@ronag how would that work with concurrency and @MoLow 's async generator case?

Flatten and apply concurrency on the flat stream of events? Basically concatMap followed by mergeMap

ronag avatar May 05 '24 15:05 ronag