node icon indicating copy to clipboard operation
node copied to clipboard

stream: Readable.concat util method

Open tugrul opened this issue 3 years ago • 4 comments

There are many npm modules for this functionality and I think it is a common requirements because I see 452k weekly downloads of multistream npm package.

tugrul avatar Sep 10 '21 21:09 tugrul

cc @nodejs/streams

tniessen avatar Sep 11 '21 00:09 tniessen

@mcollina I quickly lookup and only we need destroy remaining readable streams if it is an Array when error occurs on previous readable readings. This control not necessary for iterables because stream will initialize lazy. We do not have control of writable stream in this scenario. Following solutions can solve the problem.

async function * streamSeriesIterator(streams) {
  let index = 0;

  try {
    for await (const stream of streams) {
      index++;
      yield* stream;
    }
  } finally {
    if (streams instanceof Array) {
      streams.slice(index).forEach(stream => stream.destroy());
    }
  }
}

or

async function * streamSeriesIterator(streams) {
  let index = 0;

  try {
    for await (const stream of streams) {
      index++;
      yield* stream;
    }
  } catch (err) {
    if (streams instanceof Array) {
      streams.slice(index).forEach(stream => stream.destroy(err));
    }
    
    throw err;
  }
}

tugrul avatar Sep 11 '21 12:09 tugrul

As an alternative suggestion... consider adding this utility to the streams/consumers module and making it work with any stream.Readable, ReadableStream, and async iterable...

import { concat, arrayBuffer } from 'node:stream/consumers';

const readable1 = new ReadableStream();
const readable2 = new ReadableStream();
const readable3 = new stream.Readable();

for await (const chunk of concat(readable1, readable2, readable3)) {
  console.log(chunk);
}

// Let's say you want all of the contents in a single ArrayBuffer...

await arrayBuffer(concat(readable1, readable2, readable3));

The return value would be just an async iterator. If someone wanted a stream.Readable from the results, it would be as simple as... stream.Readable.from(concat(readable1, readable2, readable3)) ... getting a ReadableStream from it is a bit more complicated but also straightforward...

const readable = new ReadableStream({
  async pull(c) {
    for await (const chunk of concat(readable1, readable2, readable3))
      c.enqueue(chunk);
    c.close();
  }
});

jasnell avatar Sep 11 '21 15:09 jasnell

@tugrul unfortunately https://github.com/nodejs/node/pull/40075#issuecomment-917401863 does not work for node streams. They can error at any time even if they are not being consumed, you need to add on('error') handlers.

mcollina avatar Sep 11 '21 19:09 mcollina