deno_std
deno_std copied to clipboard
suggestion: `concatReadableStreams()`
Is your feature request related to a problem? Please describe.
I want to combine multiple ReadableStream instances into a single ReadableStream instance where each stream is read fully in sequence.
Describe the solution you'd like
A function to go along with earlyZipReadableStreams, mergeReadableStreams, and zipReadableStreams but processes streams in order, one at a time.
e.g.
function concatReadableStreams<T>(
...streams: ReadableStream<T>[]
): ReadableStream<T> {
return new ReadableStream({
async start(controller) {
try {
for (const stream of streams) {
for await (const chunk of stream) {
controller.enqueue(chunk);
}
}
controller.close();
} catch (e) {
controller.error(e);
}
},
});
}
Describe alternatives you've considered
I wonder if this isn't already included because there's some reason not to do this with streams or that there's already a simple way to do this built into the JS Streams API that I have yet to find.
While composing multiple readable streams into a single one I found that in some cases I need to generate the streams asynchronously. This lead me to a different implementation with a signature more like ReadableStream.from() instead of like mergeReadableStreams:
type AnyIterable<T> = AsyncIterable<T> | Iterable<T | PromiseLike<T>>;
function concatReadableStreams<T>(
streams: AnyIterable<ReadableStream<T>>,
): ReadableStream<T> {
return new ReadableStream({
async start(controller) {
try {
if (Symbol.asyncIterator in streams) {
for await (const stream of streams) {
for await (const chunk of stream) {
controller.enqueue(chunk);
}
}
} else {
// use `for...of` instead of `for await...of` to call sync generator `finally` blocks
for (const stream of streams) {
for await (const chunk of await stream) {
controller.enqueue(chunk);
}
}
}
controller.close();
} catch (e) {
controller.error(e);
}
},
});
}
@@ -1,12 +1,23 @@
+type AnyIterable<T> = AsyncIterable<T> | Iterable<T | PromiseLike<T>>;
+
function concatReadableStreams<T>(
- ...streams: ReadableStream<T>[]
+ streams: AnyIterable<ReadableStream<T>>,
): ReadableStream<T> {
return new ReadableStream({
async start(controller) {
try {
- for (const stream of streams) {
- for await (const chunk of stream) {
- controller.enqueue(chunk);
+ if (Symbol.asyncIterator in streams) {
+ for await (const stream of streams) {
+ for await (const chunk of stream) {
+ controller.enqueue(chunk);
+ }
+ }
+ } else {
+ // use `for...of` instead of `for await...of` to call sync generator `finally` blocks
+ for (const stream of streams) {
+ for await (const chunk of await stream) {
+ controller.enqueue(chunk);
+ }
}
}
controller.close();
Hmmm… I found a different way to concatenate streams by actually using ReadableStream.from() and then using pipeThrough() with a TransformStream to flatten it. I'm not sure yet which solution I prefer though.
class FlatStream<T> extends TransformStream<ReadableStream<T>, T> {
constructor() {
super({
async transform(stream, controller) {
for await (const chunk of stream) {
controller.enqueue(chunk);
}
},
});
}
}
type AnyIterable<T> = AsyncIterable<T> | Iterable<T | PromiseLike<T>>;
function concatReadableStreams<T>(
streams: AnyIterable<ReadableStream<T>>,
): ReadableStream<T> {
return ReadableStream.from(streams).pipeThrough(new FlatStream());
}
Are you able to provide a few use cases for this suggestion?
Examples:
- streaming multiple files into one stream (e.g. to a file or network response)
- adding content prefix to a response stream from a
fetchResponse
keeping things as streams reduces max memory usage, etc.
Seems reasonable to have 👍🏾
Isn't the below structure enough easy/straightforward to understand?
for await (const stream of streams) {
for await (const chunk of stream) {
// process chunk
}
}
I don't see much point to add this as a util
@kt3k with the for…await loops how do you use it with pipeTo() and/or pipeThrough()?
If I want to process chunks then the looping makes perfect sense to me but if I want to stream multiple readable streams combined together to a file or to a network response then the loops themselves are not helpful (although may be part of the solution).
Fair enough
@kt3k with the
for…awaitloops how do you use it withpipeTo()and/orpipeThrough()?If I want to process chunks then the looping makes perfect sense to me but if I want to stream multiple readable streams combined together to a file or to a network response then the loops themselves are not helpful (although may be part of the solution).
await ReadableStream.from(async function* () {
for await (const stream of streams)
for await (const chunk of stream)
yield chunk
}())
.pipeThrough(new CompressionStream('gzip'))
.pipeTo((await Deno.create('abc.gz')).writable)
This class would allow people to either pass an iterable of a bunch of streams to it upon construction or use it within a .pipeThrough method.
type Streams<T> = AsyncIterable<ReadableStream<T>> | Iterable<ReadableStream<T>>
class ConcatStreams<T> {
#readable: ReadableStream<T>
#writable: WritableStream<Streams<T>>
constructor(streams?: Streams<T>) {
const { readable, writable } = new TransformStream()
this.#readable = readable
this.#writable = writable
if (streams)
ReadableStream.from(async function* () {
for await (const stream of streams)
for await (const chunk of streams)
yield chunk
}()).pipeTo(this.#writable)
}
get readable(): ReadableStream<T> {
return this.#readable
}
get writable(): WritableStream<Streams<T>> {
return this.#writable
}
}
I closed my PR to open it up for contributors. PRs are welcome! Feel free to use the tests I wrote in #4723.
When the output ReadableStream is canceled, the inputs should be canceled too, so that the code below does not leak resources.
Deno.writeTextFile("a", "foo");
Deno.writeTextFile("b", "bar");
const readable = concatReadableStreams(
Deno.openSync("a").readable,
Deno.openSync("b").readable,
);
await readable.cancel();
When the output
ReadableStreamis canceled, the inputs should be canceled too, so that the code below does not leak resources.Deno.writeTextFile("a", "foo"); Deno.writeTextFile("b", "bar"); const readable = concatReadableStreams( Deno.openSync("a").readable, Deno.openSync("b").readable, ); await readable.cancel();
Solving the clean up is actually more difficult than it looks. The ReadableStream that contains streams is going to have its pull value called several times before that cancel method is even called. Meaning one could cancel the current sub stream that's being worked on and the main stream, but any other sub streams that are already enqueued at this time won't have their cancel methods called. One could get around that by, instead of cancelling the main stream, continuing through it and calling cancel on each sub stream, but this would lead to a longer clean up.
To demonstrate what I mean, with this implementation:
export class ConcatStreams<T> {
#readable: ReadableStream<T>
#writable: WritableStream<ReadableStream<T>>
constructor() {
const { readable, writable } = new TransformStream<ReadableStream<T>, ReadableStream<T>>()
this.#readable = new ReadableStream({
mainReader: readable.getReader(),
async pull(controller) {
if (!this.subReader) {
const { done, value } = await this.mainReader.read()
if (done)
return controller.close()
this.subReader = value.getReader()
}
const { done, value } = await this.subReader.read()
if (done)
return controller.close()
controller.enqueue(value)
},
cancel(reason) {
this.subReader?.cancel(reason)
this.mainReader.cancel(reason)
}
} as UnderlyingSource<T> & { mainReader: ReadableStreamDefaultReader<ReadableStream<T>>, subReader?: ReadableStreamDefaultReader<T> })
this.#writable = writable
}
get readable(): ReadableStream<T> {
return this.#readable
}
get writable(): WritableStream<ReadableStream<T>> {
return this.#writable
}
}
The output of this code is:
const readables = new Array(5).fill(false).map((_x, i) => new ReadableStream<number>({
gen: function* func() {
for (let i = 0; i < 10; ++i)
yield i
}(),
pull(controller) {
const { done, value } = this.gen.next()
if (done)
return controller.close()
controller.enqueue(value)
},
cancel(reason) {
console.log(i, reason)
}
} as UnderlyingSource & { gen: Generator<number> }))
await new ReadableStream<ReadableStream<number>>({
pull(controller) {
if (readables.length) {
console.log('Pulling')
controller.enqueue(readables.shift()!)
}
else
controller.close()
},
cancel(reason) {
console.log('Reason:', reason)
readables.forEach(b => {
console.log(false)
b.cancel(reason)
})
}
})
.pipeThrough(new ConcatStreams())
.cancel('potato')
Pulling
Pulling
Reason: potato
false
2 potato
false
3 potato
false
4 potato
The first two sub streams were pulled in, but their cancel methods were never called. subReader at the time of calling .cancel('potato') was still undefined.
This issue can be closed now since https://github.com/denoland/deno_std/pull/4747 has been merged
Thank you for the suggestion, @mfulton26, and for the implementation, @BlackAsLight.