connect-es icon indicating copy to clipboard operation
connect-es copied to clipboard

AsyncIterator is always one message behind when using a ServerStreaming method.

Open aslatemts opened this issue 3 years ago • 1 comments

Hey, just started using connect-web this week, have been finding fantastic so far. However we have encountered an issue when using the ServerStreaming promise client.

Describe the bug When using a long-lived streaming connection - the stream appears to always be one message behind. We're trying to keep a streaming connection alive to receive log messages as they come in. To do this we're iterating over the AsyncIterator returned by our getLogs method. This mostly works - however the stream stays one message behind until the connection is closed, and then the final message comes through.

To Reproduce Loop over the AsyncIterator returned by a ServerStreaming method like so:

const logClient = createPromiseClient(LogService, transport);
for await (const logEvent of logClient.getLiveLogs(new GetLiveLogsRequest(), callOptions)) {
    console.log(logEvent);
}

You may need your server to only send messages slowly to notice the issue. The initial message will not display until a second comes through.

Environment:

  • @bufbuild/connect-web version: 0.1.0
  • Framework and version: [email protected]
  • Browser and version: Google Chrome 104.0.5112.81

Possible solution It looks like the issue occurs in the createEnvelopeReadableStream function in envelope.ts: https://github.com/bufbuild/connect-web/blob/aa0a5645077ea689b50b040ad8e6298a6b5612b6/packages/connect-web/src/envelope.ts#L57-L73

The readable stream reads the header from the buffer, before immediately awaiting the next read, rather than enqueuing the message if there are enough bytes in the buffer to return the message.

This causes the ReadableStream to keep the message buffered until the next message comes through (and reader.read() returns); it will stay one message behind until the stream closes.

I modified the loop to instead check the header immediately after reading the header from the buffer like so:

for (;;) {
    if (header === void 0 && buffer.byteLength >= 5) {
        let length = 0;
        for (let i = 1; i < 5; i++) {
            length = (length << 8) + buffer[i];
        }
        header = { flags: buffer[0], length };
    }
    if (header !== void 0 && buffer.byteLength >= header.length + 5) {
        break;
    }
    const result = await reader.read();
    if (result.done) {
        break;
    }
    append(result.value);
}

This works for us as expected.

aslatemts avatar Aug 12 '22 02:08 aslatemts

Thanks for the detailed report, @aslatemts!

(Tracking internally in https://linear.app/bufbuild/issue/TCN-338)

timostamm avatar Aug 12 '22 09:08 timostamm