node icon indicating copy to clipboard operation
node copied to clipboard

Allow multiple readers to iterate over a stream at the same time

Open ehmicky opened this issue 1 year ago • 2 comments

What is the problem this feature will solve?

When multiple callers use Readable[Symbol.asyncIterator], each caller receives a partial result.

import {Readable} from 'node:stream'

const stream = Readable.from(['a', 'b', 'c'])

const iterate = async (readerName) => {
  for await (const chunk of stream) {
    console.log(readerName, chunk)
  }
}

await Promise.all([
  iterate('one'),
  iterate('two'),
])
one a
two b
one c

Some callers might expect that result. But others might expect the following result instead.

one a
two a
one b
two b
one c
two c

What is the feature you are proposing to solve the problem?

Adding an option to readable.iterator() to use readable.on('data') instead of readable.read(). This would enable the above behavior.

What alternatives have you considered?

Implementing this user-land.

See an example of it at https://github.com/sindresorhus/get-stream/pull/121

ehmicky avatar Mar 14 '24 16:03 ehmicky

Hi @ehmicky, I had a look at your implementation and realised you took the effort to make it work with both node 18 and 20, I think if this is implemented in nodejs core it shouldn't have any backward compatibility concern right (since it was a breaking change)? Correct me if I am wrong.

jakecastelli avatar Jun 29 '24 15:06 jakecastelli

Hi @jakecastelli,

The implementation in get-stream works with Node 18, 20 and 22. However, it would have some subtle differences and breaking changes from the current implementation in Node.js.

  • The stream is in flowing mode
  • Error handling and stream termination detection might behave slightly differently, since it relies on events on() instead of using stream finished(). For example, multiple errors are currently aggregated.
  • Different event listeners are setup on the stream, which might be breaking if some users are directly manipulating stream event listeners of specific types.
  • When stream.destroy() is called might be slightly different. Also, it does not take into account any destroyOnReturn option nor whether autoDestroy is set.

Overall, IMHO, it would be safer to keep the current Node.js implementation, and provide this new behavior as an opt-in, for backward compatibility. For example, using a boolean option to stream.iterator().


As a side note, please note that the implementation in get-stream does not pass the highWaterMark option to events.on() (see https://github.com/sindresorhus/get-stream/pull/125). That's because get-stream consumes the iterable synchronously and right away.

However, since events.on() buffers incoming chunks, it might be a good idea to allow the user to specify the highWaterMark option, in case they are consuming the iterable too slowly. This will automatically pause/resume the stream in order to prevent the buffer from leaking memory. This is different from stream.readableHighWaterMark: it is measured in number of chunks (not number of bytes), and it paces events.on() (not stream.on('data')). This might presumably be confusing to some users, so I am not sure whether a different name would help clear that confusion.

ehmicky avatar Jun 29 '24 18:06 ehmicky

My team is claiming this issue.

mycoleb avatar Jul 01 '24 02:07 mycoleb

@nodejs/streams should Readable have a tee() helper/method like webstream readable?

MoLow avatar Jul 01 '24 06:07 MoLow

@mycoleb your what is what?

benjamingr avatar Jul 01 '24 06:07 benjamingr

@nodejs/streams should Readable have a tee() helper/method like webstream readable?

Maybe? It's just tricky to get semantics right namely backpressure/watermarking and cleanup/resources. A tee'd stream should get destroyed when all its forks get destroyed, it should communicate backpressure etc - and we need to decide what happens when one "fork" is flowing and one is not

benjamingr avatar Jul 01 '24 06:07 benjamingr

Maybe? It's just tricky to get semantics right namely backpressure/watermarking and cleanup/resources. A tee'd stream should get destroyed when all its forks get destroyed, it should communicate backpressure etc - and we need to decide what happens when one "fork" is flowing and one is not

I didn't say it is simple, but this is a very common need

MoLow avatar Jul 01 '24 06:07 MoLow

Yeah my point is we shouldn't land an implementation that doesn't deal with these things

benjamingr avatar Jul 01 '24 06:07 benjamingr

Unfortunately, using on('data') would be complex and prone to errors due to backpressure requirements. Using .read() and on('readable') simplified quite a lot of that handling.

A proper implementation of this is exceptionally hard. https://www.npmjs.com/package/cloneable-readable works, but it's complex, and I couldn't compress it down in a nice API like .tee() without adding more state varibles.

I'm not opposed to add cloneable-readable to Node.js core, or moving it to the Node.js org, as I always considered it as part of my activities in Node.js.

mcollina avatar Jul 01 '24 08:07 mcollina

There has been no activity on this feature request for 5 months. To help maintain relevant open issues, please add the https://github.com/nodejs/node/labels/never-stale label or close this issue if it should be closed. If not, the issue will be automatically closed 6 months after the last non-automated comment. For more information on how the project manages feature requests, please consult the feature request management document.

github-actions[bot] avatar Dec 29 '24 01:12 github-actions[bot]