node
node copied to clipboard
stream: Readable.concat util method
There are many npm modules for this functionality and I think it is a common requirements because I see 452k weekly downloads of multistream npm package.
cc @nodejs/streams
@mcollina I quickly lookup and only we need destroy remaining readable streams if it is an Array
when error occurs on previous readable readings. This control not necessary for iterables because stream will initialize lazy. We do not have control of writable stream in this scenario. Following solutions can solve the problem.
async function * streamSeriesIterator(streams) {
let index = 0;
try {
for await (const stream of streams) {
index++;
yield* stream;
}
} finally {
if (streams instanceof Array) {
streams.slice(index).forEach(stream => stream.destroy());
}
}
}
or
async function * streamSeriesIterator(streams) {
let index = 0;
try {
for await (const stream of streams) {
index++;
yield* stream;
}
} catch (err) {
if (streams instanceof Array) {
streams.slice(index).forEach(stream => stream.destroy(err));
}
throw err;
}
}
As an alternative suggestion... consider adding this utility to the streams/consumers
module and making it work with any stream.Readable
, ReadableStream
, and async iterable...
import { concat, arrayBuffer } from 'node:stream/consumers';
const readable1 = new ReadableStream();
const readable2 = new ReadableStream();
const readable3 = new stream.Readable();
for await (const chunk of concat(readable1, readable2, readable3)) {
console.log(chunk);
}
// Let's say you want all of the contents in a single ArrayBuffer...
await arrayBuffer(concat(readable1, readable2, readable3));
The return value would be just an async iterator. If someone wanted a stream.Readable
from the results, it would be as simple as... stream.Readable.from(concat(readable1, readable2, readable3))
... getting a ReadableStream
from it is a bit more complicated but also straightforward...
const readable = new ReadableStream({
async pull(c) {
for await (const chunk of concat(readable1, readable2, readable3))
c.enqueue(chunk);
c.close();
}
});
@tugrul unfortunately https://github.com/nodejs/node/pull/40075#issuecomment-917401863 does not work for node streams. They can error at any time even if they are not being consumed, you need to add on('error') handlers.