deno_std
deno_std copied to clipboard
feat(streams): `concatStreams()`
Implements https://github.com/denoland/deno_std/issues/4500
This implementation broadens the scope of concatenating streams to concatenating anything that implements the Symbol.asyncIterator or Symbol.iterator. The reason for this is that the only difference needed to support the broader scope compared to just concatenating ReadableStreams exists in TypeScript alone.
This implementation allows it to be used in two ways, either;
- providing the "array" of "streams" as an argument
new ConcatStreams(streams); or - using it within a
.pipeThroughmethod
When the static method ReadableStream.from is supported more widely in the browser, this code can be simplified by several lines.
Codecov Report
Attention: Patch coverage is 84.00000% with 4 lines in your changes are missing coverage. Please review.
Project coverage is 91.45%. Comparing base (
aa35b35) to head (6948d9f). Report is 27 commits behind head on main.
| Files | Patch % | Lines |
|---|---|---|
| streams/concat_readable_streams.ts | 84.00% | 4 Missing :warning: |
Additional details and impacted files
@@ Coverage Diff @@
## main #4747 +/- ##
==========================================
- Coverage 91.89% 91.45% -0.44%
==========================================
Files 484 486 +2
Lines 41296 41340 +44
Branches 5319 5288 -31
==========================================
- Hits 37947 37807 -140
- Misses 3292 3474 +182
- Partials 57 59 +2
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
I'm not sure it's good idea to implement this as TransformStream. No Deno API or Web API returns stream of streams or stream of iterables.
The original comment of #4500 suggests a function which concatenate the streams in an array. That design makes more sense to me.
I'm not sure it's good idea to implement this as TransformStream. No Deno API or Web API returns stream of streams or stream of iterables.
The original comment of #4500 suggests a function which concatenate the streams in an array. That design makes more sense to me.
I changed it up to a ReadableStream constructor instead. Taking in an iterable, like an array, of readable streams and concatenating them on a pulling method.
There is also extra code to preform a clean up if the .cancel() method is called, which will be iterating through the rest of the array and calling cancel on each readable stream.
Example:
const streams = new Array(10).fill(0).map((_x, i) =>
ReadableStream.from(function* () {
for (let j = i * 10; j < i * 10 + 10; ++j) {
yield j;
}
}())
);
console.log(await Array.fromAsync(new ConcatStreams(streams)));
[
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47,
48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59,
60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71,
72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83,
84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95,
96, 97, 98, 99
]
I believe Yoshiya meant that he thought this should be a function, like mergeReadableStream() and zipReadableStreams(). We only implement classes when a function won't suffice.
@crowlKats, able to take a look at this?
Oh okay, converting it to a function would be quite simple. Essentially just:
function concatStreams<T>(streams: ...) {
const gen = ...
let lock = false
return new ReadableStream<T>({
...
})
}
I can do this if you'd like
Sounds good 👍🏾
here is a completely alternative implementation to match the above comment:
export function concatReadableStreams<T>(
...streams: ReadableStream<T>[]
): ReadableStream<T> {
let currentStream = 0;
return new ReadableStream<T>({
async pull(controller) {
const stream = streams[currentStream];
const reader = stream.getReader();
try {
const read = await reader.read();
if (read.done) {
currentStream++;
if (streams.length == currentStream) {
controller.close();
} else {
await this.pull(controller);
}
} else {
controller.enqueue(read.value);
}
} catch (e) {
controller.error(e);
}
reader.releaseLock();
},
});
}
} else { await this.pull(controller); }
I don't think this line is needed as you're essentially forcing the contents of all the streams inside the queue regardless if the queue is being emptied or not. Essentially turning a pulling method into a pushing one.
@BlackAsLight that line is necessary because if a pull is called but the read of the current internal stream is done, we would be enqueueing nothing, which isn't valid. That call just calls pull again to get the next value; it doesn't turn into a pushing one
@BlackAsLight that line is necessary because if a pull is called but the read of the current internal stream is
done, we would be enqueueing nothing, which isn't valid. That call just calls pull again to get the next value; it doesn't turn into a pushing one
Oh ya, you're right. I was thinking about it wrong