streams icon indicating copy to clipboard operation
streams copied to clipboard

Performance optimization: `ReadableStreamDefaultReader.prototype.readMany`

Open Jarred-Sumner opened this issue 3 years ago • 3 comments

Some sources enqueue many small chunks, but ReadableStreamDefaultReader.prototype.read only returns one chunk at a time. ReadableStreamDefaultReader.prototype.read has non-trivial overhead (at least one tick per call)

When the developer knows they want to read everything queued, it is faster to ask for all the values available directly instead of reading one value at a time.

I implemented this in Bun.

This led to a > 30% performance improvement to server-side rendering React via new Response(await renderToReadableStream(reactElement)).arrayBuffer()

Before: image

After: image

Note: these numbers are out of date, bun's implementation is faster now - but the delta above can directly be attributed to ReadableStreamDefaultReader.prototype.readMany

Here is an example implementation that uses JavaScriptCore builtins
function readMany()
{
    "use strict";

    if (!@isReadableStreamDefaultReader(this))
        @throwTypeError("ReadableStreamDefaultReader.readMany() should not be called directly");

    const stream = @getByIdDirectPrivate(this, "ownerReadableStream");
    if (!stream)
        @throwTypeError("readMany() called on a reader owned by no readable stream");

    const state = @getByIdDirectPrivate(stream, "state");
    @putByIdDirectPrivate(stream, "disturbed", true);
    if (state === @streamClosed)
        return {value: [], size: 0, done: true};
    else if (state === @streamErrored) {
        throw @getByIdDirectPrivate(stream, "storedError");
    }

    
    var controller = @getByIdDirectPrivate(stream, "readableStreamController");

    const content = @getByIdDirectPrivate(controller, "queue").content;
    var size = @getByIdDirectPrivate(controller, "queue").size;
    var values = content.toArray(false);
    var length = values.length;

    if (length > 0) {

        if (@isReadableByteStreamController(controller)) {
            for (var i = 0; i < value.length; i++) {
                const buf = value[i];
                if (!(@ArrayBuffer.@isView(buf) || buf instanceof @ArrayBuffer)) {
                    value[i] = new @Uint8Array(buf.buffer, buf.byteOffset, buf.byteLength);
                }
            }
        }
        
        @resetQueue(@getByIdDirectPrivate(controller, "queue"));

        if (@getByIdDirectPrivate(controller, "closeRequested"))
            @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream"));
        else if (@isReadableStreamDefaultController(controller)) 
            @readableStreamDefaultControllerCallPullIfNeeded(controller);
        else if (@isReadableByteStreamController(controller))
            @readableByteStreamControllerCallPullIfNeeded(controller);

        return @createFulfilledPromise({value: values, size, done: false});
    }

    var onPullMany = (result) => {
        if (result.done) {
            return @createFulfilledPromise({value: [], size: 0, done: true});
        }
        var controller = @getByIdDirectPrivate(stream, "readableStreamController");
        
        var queue = @getByIdDirectPrivate(controller, "queue");
        var value = [result.value].concat(queue.content.toArray(false));

        if (@isReadableByteStreamController(controller)) {
            for (var i = 0; i < value.length; i++) {
                const buf = value[i];
                if (!(@ArrayBuffer.@isView(buf) || buf instanceof @ArrayBuffer)) {
                    value[i] = new @Uint8Array(buf.buffer, buf.byteOffset, buf.byteLength);
                }
            }
        }
        var size = queue.size;
        @resetQueue(queue);

        if (@getByIdDirectPrivate(controller, "closeRequested"))
            @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream"));
        else if (@isReadableStreamDefaultController(controller)) 
            @readableStreamDefaultControllerCallPullIfNeeded(controller);
        else if (@isReadableByteStreamController(controller))
            @readableByteStreamControllerCallPullIfNeeded(controller);
        

        
        return @createFulfilledPromise({value: value, size: size, done: false});
    };
    
    var pullResult = controller.@pull(controller);
    if (pullResult && @isPromise(pullResult)) {
        return pullResult.@then(onPullMany);
    }

    return onPullMany(pullResult);
}

I have no opinion about the name of the function - just that some way to read all the queued values in one call should be exposed in the public interface for developers.

Jarred-Sumner avatar Jun 14 '22 00:06 Jarred-Sumner

I've been considering the possibility of a new kind of reader that implements the Body mixin, which would provide opportunities for these kinds of optimizations. e.g.

const readable = new ReadableStream({ ... });

const reader = new ReadableStreamBodyReader(readable);

await reader.arrayBuffer();

// or

await reader.blob();

// or

await reader.text();

The ReadableStreamBodyReader would provide opportunities to more efficiently consume the ReadableStream without going through the typical chunk-by-chunk slow path and would not require any changes to the existing standard API surface.

Now, this approach would necessarily consume the entire readable so if we wanted an alternative that would consume the stream incrementally, then this won't work, but the basic idea remains the same: introducing a new kind of Reader is the right way to accomplish this.

jasnell avatar Jun 14 '22 14:06 jasnell

introducing a new kind of Reader is the right way to accomplish this

Yeah, that makes sense.

I've been considering the possibility of a new kind of reader that implements the Body mixin const reader = new ReadableStreamBodyReader(readable);

This is nice. Only thought here is maybe getReader("body") would be nice to have, too.

The ReadableStreamBodyReader would provide opportunities to more efficiently consume the ReadableStream without going through the typical chunk-by-chunk slow path and would not require any changes to the existing standard API surface.

Thinking more on this today – I think this problem needs to be addressed in the controller more so than the reader. The problem is that the reader can't tell the controller what it wants to do because the controller starts doing work before the reader is acquired.

For converting a stream to an array/arrayBuffer/text/blob, instead of creating a bunch of chunks and merging them at the end, it should just not create a bunch of chunks in the first place.

For streaming an HTTP response body, the HTTP socket/client is much better equipped to efficiently chunk up input than ReactDOM.

I think a new controller type would work be more effective, but ReadableStreamBodyReader would also be nice

Jarred-Sumner avatar Jun 15 '22 09:06 Jarred-Sumner

I like that and I’ve already implemented specialized functions for each of those cases, so it would make a lot of sense to just have a separate reader

Ideally, it would have a way to tell the WritableStream the format it wants too

On Tue, Jun 14, 2022 at 7:00 AM James M Snell @.***> wrote:

I've been considering the possibility of a new kind of reader that implements the Body mixin, which would provide opportunities for these kinds of optimizations. e.g.

const readable = new ReadableStream({ ... }); const reader = new ReadableStreamBodyReader(readable); await reader.arrayBuffer(); // or await reader.blob(); // or await reader.text();

The ReadableStreamBodyReader would provide opportunities to more efficiently consume the ReadableStream without going through the typical chunk-by-chunk slow path and would not require any changes to the existing standard API surface.

— Reply to this email directly, view it on GitHub https://github.com/whatwg/streams/issues/1236#issuecomment-1155226720, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAFNGS5GGQPMKMZURVORFOTVPCGADANCNFSM5YV6GSGA . You are receiving this because you authored the thread.Message ID: @.***>

Jarred-Sumner avatar Oct 11 '22 07:10 Jarred-Sumner