deno_std icon indicating copy to clipboard operation
deno_std copied to clipboard

feat(streams): `concatStreams()`

Open BlackAsLight opened this issue 1 year ago • 7 comments

Implements https://github.com/denoland/deno_std/issues/4500

This implementation broadens the scope of concatenating streams to concatenating anything that implements the Symbol.asyncIterator or Symbol.iterator. The reason for this is that the only difference needed to support the broader scope compared to just concatenating ReadableStreams exists in TypeScript alone.

This implementation allows it to be used in two ways, either;

  1. providing the "array" of "streams" as an argument new ConcatStreams(streams); or
  2. using it within a .pipeThrough method

When the static method ReadableStream.from is supported more widely in the browser, this code can be simplified by several lines.

BlackAsLight avatar May 16 '24 03:05 BlackAsLight

Codecov Report

Attention: Patch coverage is 84.00000% with 4 lines in your changes are missing coverage. Please review.

Project coverage is 91.45%. Comparing base (aa35b35) to head (6948d9f). Report is 27 commits behind head on main.

Files Patch % Lines
streams/concat_readable_streams.ts 84.00% 4 Missing :warning:
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #4747      +/-   ##
==========================================
- Coverage   91.89%   91.45%   -0.44%     
==========================================
  Files         484      486       +2     
  Lines       41296    41340      +44     
  Branches     5319     5288      -31     
==========================================
- Hits        37947    37807     -140     
- Misses       3292     3474     +182     
- Partials       57       59       +2     

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

codecov[bot] avatar May 16 '24 05:05 codecov[bot]

I'm not sure it's good idea to implement this as TransformStream. No Deno API or Web API returns stream of streams or stream of iterables.

The original comment of #4500 suggests a function which concatenate the streams in an array. That design makes more sense to me.

kt3k avatar May 16 '24 06:05 kt3k

I'm not sure it's good idea to implement this as TransformStream. No Deno API or Web API returns stream of streams or stream of iterables.

The original comment of #4500 suggests a function which concatenate the streams in an array. That design makes more sense to me.

I changed it up to a ReadableStream constructor instead. Taking in an iterable, like an array, of readable streams and concatenating them on a pulling method.

There is also extra code to preform a clean up if the .cancel() method is called, which will be iterating through the rest of the array and calling cancel on each readable stream.

BlackAsLight avatar May 17 '24 07:05 BlackAsLight

Example:

const streams = new Array(10).fill(0).map((_x, i) =>
  ReadableStream.from(function* () {
    for (let j = i * 10; j < i * 10 + 10; ++j) {
      yield j;
    }
  }())
);

console.log(await Array.fromAsync(new ConcatStreams(streams)));
[
   0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11,
  12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
  24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
  36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47,
  48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59,
  60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71,
  72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83,
  84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95,
  96, 97, 98, 99
]

BlackAsLight avatar May 17 '24 07:05 BlackAsLight

I believe Yoshiya meant that he thought this should be a function, like mergeReadableStream() and zipReadableStreams(). We only implement classes when a function won't suffice.

@crowlKats, able to take a look at this?

iuioiua avatar May 19 '24 08:05 iuioiua

Oh okay, converting it to a function would be quite simple. Essentially just:

function concatStreams<T>(streams: ...) {
  const gen = ...
  let lock = false
  return new ReadableStream<T>({
    ... 
  })
}

I can do this if you'd like

BlackAsLight avatar May 19 '24 08:05 BlackAsLight

Sounds good 👍🏾

iuioiua avatar May 19 '24 08:05 iuioiua

here is a completely alternative implementation to match the above comment:

export function concatReadableStreams<T>(
  ...streams: ReadableStream<T>[]
): ReadableStream<T> {
  let currentStream = 0;

  return new ReadableStream<T>({
    async pull(controller) {
      const stream = streams[currentStream];
      const reader = stream.getReader();
      try {
        const read = await reader.read();

        if (read.done) {
          currentStream++;
          if (streams.length == currentStream) {
            controller.close();
          } else {
            await this.pull(controller);
          }
        } else {
          controller.enqueue(read.value);
        }
      } catch (e) {
        controller.error(e);
      }

      reader.releaseLock();
    },
  });
}

crowlKats avatar May 19 '24 15:05 crowlKats

} else {
  await this.pull(controller);
}

I don't think this line is needed as you're essentially forcing the contents of all the streams inside the queue regardless if the queue is being emptied or not. Essentially turning a pulling method into a pushing one.

BlackAsLight avatar May 19 '24 19:05 BlackAsLight

@BlackAsLight that line is necessary because if a pull is called but the read of the current internal stream is done, we would be enqueueing nothing, which isn't valid. That call just calls pull again to get the next value; it doesn't turn into a pushing one

crowlKats avatar May 19 '24 19:05 crowlKats

@BlackAsLight that line is necessary because if a pull is called but the read of the current internal stream is done, we would be enqueueing nothing, which isn't valid. That call just calls pull again to get the next value; it doesn't turn into a pushing one

Oh ya, you're right. I was thinking about it wrong

BlackAsLight avatar May 19 '24 19:05 BlackAsLight