deno_std icon indicating copy to clipboard operation
deno_std copied to clipboard

suggestion: `concatReadableStreams()`

Open mfulton26 opened this issue 1 year ago • 14 comments
trafficstars

Is your feature request related to a problem? Please describe.

I want to combine multiple ReadableStream instances into a single ReadableStream instance where each stream is read fully in sequence.

Describe the solution you'd like

A function to go along with earlyZipReadableStreams, mergeReadableStreams, and zipReadableStreams but processes streams in order, one at a time.

e.g.

function concatReadableStreams<T>(
  ...streams: ReadableStream<T>[]
): ReadableStream<T> {
  return new ReadableStream({
    async start(controller) {
      try {
        for (const stream of streams) {
          for await (const chunk of stream) {
            controller.enqueue(chunk);
          }
        }
        controller.close();
      } catch (e) {
        controller.error(e);
      }
    },
  });
}

Describe alternatives you've considered

I wonder if this isn't already included because there's some reason not to do this with streams or that there's already a simple way to do this built into the JS Streams API that I have yet to find.

mfulton26 avatar Mar 17 '24 02:03 mfulton26

While composing multiple readable streams into a single one I found that in some cases I need to generate the streams asynchronously. This lead me to a different implementation with a signature more like ReadableStream.from() instead of like mergeReadableStreams:

type AnyIterable<T> = AsyncIterable<T> | Iterable<T | PromiseLike<T>>;

function concatReadableStreams<T>(
  streams: AnyIterable<ReadableStream<T>>,
): ReadableStream<T> {
  return new ReadableStream({
    async start(controller) {
      try {
        if (Symbol.asyncIterator in streams) {
          for await (const stream of streams) {
            for await (const chunk of stream) {
              controller.enqueue(chunk);
            }
          }
        } else {
          // use `for...of` instead of `for await...of` to call sync generator `finally` blocks
          for (const stream of streams) {
            for await (const chunk of await stream) {
              controller.enqueue(chunk);
            }
          }
        }
        controller.close();
      } catch (e) {
        controller.error(e);
      }
    },
  });
}
@@ -1,12 +1,23 @@
+type AnyIterable<T> = AsyncIterable<T> | Iterable<T | PromiseLike<T>>;
+
 function concatReadableStreams<T>(
-  ...streams: ReadableStream<T>[]
+  streams: AnyIterable<ReadableStream<T>>,
 ): ReadableStream<T> {
   return new ReadableStream({
     async start(controller) {
       try {
-        for (const stream of streams) {
-          for await (const chunk of stream) {
-            controller.enqueue(chunk);
+        if (Symbol.asyncIterator in streams) {
+          for await (const stream of streams) {
+            for await (const chunk of stream) {
+              controller.enqueue(chunk);
+            }
+          }
+        } else {
+          // use `for...of` instead of `for await...of` to call sync generator `finally` blocks
+          for (const stream of streams) {
+            for await (const chunk of await stream) {
+              controller.enqueue(chunk);
+            }
           }
         }
         controller.close();

mfulton26 avatar Mar 18 '24 12:03 mfulton26

Hmmm… I found a different way to concatenate streams by actually using ReadableStream.from() and then using pipeThrough() with a TransformStream to flatten it. I'm not sure yet which solution I prefer though.

class FlatStream<T> extends TransformStream<ReadableStream<T>, T> {
  constructor() {
    super({
      async transform(stream, controller) {
        for await (const chunk of stream) {
          controller.enqueue(chunk);
        }
      },
    });
  }
}

type AnyIterable<T> = AsyncIterable<T> | Iterable<T | PromiseLike<T>>;

function concatReadableStreams<T>(
  streams: AnyIterable<ReadableStream<T>>,
): ReadableStream<T> {
  return ReadableStream.from(streams).pipeThrough(new FlatStream());
}

mfulton26 avatar Mar 18 '24 12:03 mfulton26

Are you able to provide a few use cases for this suggestion?

iuioiua avatar Mar 18 '24 20:03 iuioiua

Examples:

  • streaming multiple files into one stream (e.g. to a file or network response)
  • adding content prefix to a response stream from a fetch Response

keeping things as streams reduces max memory usage, etc.

mfulton26 avatar Mar 25 '24 19:03 mfulton26

Seems reasonable to have 👍🏾

iuioiua avatar Mar 25 '24 21:03 iuioiua

Isn't the below structure enough easy/straightforward to understand?

for await (const stream of streams) {
  for await (const chunk of stream) {
    // process chunk
  }
}

I don't see much point to add this as a util

kt3k avatar Mar 26 '24 12:03 kt3k

@kt3k with the for…await loops how do you use it with pipeTo() and/or pipeThrough()?

If I want to process chunks then the looping makes perfect sense to me but if I want to stream multiple readable streams combined together to a file or to a network response then the loops themselves are not helpful (although may be part of the solution).

mfulton26 avatar Mar 26 '24 14:03 mfulton26

Fair enough

kt3k avatar Mar 27 '24 06:03 kt3k

@kt3k with the for…await loops how do you use it with pipeTo() and/or pipeThrough()?

If I want to process chunks then the looping makes perfect sense to me but if I want to stream multiple readable streams combined together to a file or to a network response then the loops themselves are not helpful (although may be part of the solution).

await ReadableStream.from(async function* () {
  for await (const stream of streams)
    for await (const chunk of stream)
      yield chunk
}())
  .pipeThrough(new CompressionStream('gzip'))
  .pipeTo((await Deno.create('abc.gz')).writable)

BlackAsLight avatar May 14 '24 03:05 BlackAsLight

This class would allow people to either pass an iterable of a bunch of streams to it upon construction or use it within a .pipeThrough method.

type Streams<T> = AsyncIterable<ReadableStream<T>> | Iterable<ReadableStream<T>>

class ConcatStreams<T> {
  #readable: ReadableStream<T>
  #writable: WritableStream<Streams<T>>
  constructor(streams?: Streams<T>) {
    const { readable, writable } = new TransformStream()
    this.#readable = readable
    this.#writable = writable
    if (streams)
      ReadableStream.from(async function* () {
        for await (const stream of streams)
          for await (const chunk of streams)
            yield chunk
      }()).pipeTo(this.#writable)
  }

  get readable(): ReadableStream<T> {
    return this.#readable
  }

  get writable(): WritableStream<Streams<T>> {
    return this.#writable
  }
}

BlackAsLight avatar May 14 '24 03:05 BlackAsLight

I closed my PR to open it up for contributors. PRs are welcome! Feel free to use the tests I wrote in #4723.

iuioiua avatar May 14 '24 23:05 iuioiua

When the output ReadableStream is canceled, the inputs should be canceled too, so that the code below does not leak resources.

Deno.writeTextFile("a", "foo");
Deno.writeTextFile("b", "bar");
const readable = concatReadableStreams(
  Deno.openSync("a").readable,
  Deno.openSync("b").readable,
);
await readable.cancel();

0f-0b avatar May 16 '24 04:05 0f-0b

When the output ReadableStream is canceled, the inputs should be canceled too, so that the code below does not leak resources.

Deno.writeTextFile("a", "foo");
Deno.writeTextFile("b", "bar");
const readable = concatReadableStreams(
  Deno.openSync("a").readable,
  Deno.openSync("b").readable,
);
await readable.cancel();

Solving the clean up is actually more difficult than it looks. The ReadableStream that contains streams is going to have its pull value called several times before that cancel method is even called. Meaning one could cancel the current sub stream that's being worked on and the main stream, but any other sub streams that are already enqueued at this time won't have their cancel methods called. One could get around that by, instead of cancelling the main stream, continuing through it and calling cancel on each sub stream, but this would lead to a longer clean up.

BlackAsLight avatar May 16 '24 05:05 BlackAsLight

To demonstrate what I mean, with this implementation:

export class ConcatStreams<T> {
    #readable: ReadableStream<T>
    #writable: WritableStream<ReadableStream<T>>
    constructor() {
        const { readable, writable } = new TransformStream<ReadableStream<T>, ReadableStream<T>>()
        this.#readable = new ReadableStream({
            mainReader: readable.getReader(),
            async pull(controller) {
                if (!this.subReader) {
                    const { done, value } = await this.mainReader.read()
                    if (done)
                        return controller.close()
                    this.subReader = value.getReader()
                }
                const { done, value } = await this.subReader.read()
                if (done)
                    return controller.close()
                controller.enqueue(value)
            },
            cancel(reason) {
                this.subReader?.cancel(reason)
                this.mainReader.cancel(reason)
            }
        } as UnderlyingSource<T> & { mainReader: ReadableStreamDefaultReader<ReadableStream<T>>, subReader?: ReadableStreamDefaultReader<T> })
        this.#writable = writable
    }

    get readable(): ReadableStream<T> {
        return this.#readable
    }

    get writable(): WritableStream<ReadableStream<T>> {
        return this.#writable
    }
}

The output of this code is:

const readables = new Array(5).fill(false).map((_x, i) => new ReadableStream<number>({
    gen: function* func() {
        for (let i = 0; i < 10; ++i)
            yield i
    }(),
    pull(controller) {
        const { done, value } = this.gen.next()
        if (done)
            return controller.close()
        controller.enqueue(value)
    },
    cancel(reason) {
        console.log(i, reason)
    }
} as UnderlyingSource & { gen: Generator<number> }))

await new ReadableStream<ReadableStream<number>>({
    pull(controller) {
        if (readables.length) {
            console.log('Pulling')
            controller.enqueue(readables.shift()!)
        }
        else
            controller.close()
    },
    cancel(reason) {
        console.log('Reason:', reason)
        readables.forEach(b => {
            console.log(false)
            b.cancel(reason)
        })
    }
})
    .pipeThrough(new ConcatStreams())
    .cancel('potato')
Pulling
Pulling
Reason: potato
false
2 potato
false
3 potato
false
4 potato

The first two sub streams were pulled in, but their cancel methods were never called. subReader at the time of calling .cancel('potato') was still undefined.

BlackAsLight avatar May 16 '24 06:05 BlackAsLight

This issue can be closed now since https://github.com/denoland/deno_std/pull/4747 has been merged

BlackAsLight avatar May 21 '24 02:05 BlackAsLight

Thank you for the suggestion, @mfulton26, and for the implementation, @BlackAsLight.

iuioiua avatar May 21 '24 03:05 iuioiua