async-iterable append() implementation is slow
https://github.com/connectrpc/connect-es/blob/36af3f296f6e70a59f59135c1658de664581f6bf/packages/connect/src/protocol/async-iterable.ts#L1008-L1017
this causes repeated memcpy on the cpu every time a new chunk comes in.
imagine a long streaming response where a 50kb response is streaming in 10 bytes at a time. towards the end of the stream, a huge amount of data is copied over and over every time 10 bytes are added.
the implementation here should mimic realloc, which generally grows the underlying buffer exponentially to avoid repeated copies.
one option is the uint8arraylist node module. i had some success integrating it, but it only supports ESM whereas connect needs to support CJS as well.
https://github.com/achingbrain/uint8arraylist https://medium.com/@lagerdata/a-resizable-typedarray-f5b9a861958
Thanks for the issue! It's certainly possible to avoid a lot of allocations and copies.
- Uint8ArrayList keeps a reference of chunks and defers concatenation.
- GrowableUint8Array allocates legroom in a buffer.
The downside of both approaches is that both implementations aren't TypedArrays, and can't be used for views (e.g. Uint8Array, DataView). This complicates using them efficiently for reading envelope sizes.
The proper solution are resizable ArrayBuffers. The API allows growing on demand, makes it easy to allocate for an envelope, and is compatible with views. Unfortunately, it's relatively new, and we have to hold back on using it to support older browsers and Node.js < v20.
Besides transformSplitEnvelope, readAllBytes from the same file could also make use of resizable ArrayBuffers, as well as some other places in the code base, and several places in Protobuf-ES. So I'm not sure that it's worth putting much work into optimizing transformSplitEnvelope, when the more general optimization with native support is in sight.
Thanks for the resizable ArrayBuffers reference. I will see if I can wire that up.
Here's what I am using currently with uint8arraylist
diff --git a/packages/connect/src/protocol/async-iterable.ts b/packages/connect/src/protocol/async-iterable.ts
index 9dbc0f9a..fb591a44 100644
--- a/packages/connect/src/protocol/async-iterable.ts
+++ b/packages/connect/src/protocol/async-iterable.ts
@@ -23,6 +23,7 @@ import {
import type { Serialization } from "./serialization.js";
import type { Compression } from "./compression.js";
import { assertReadMaxBytes } from "./limit-io.js";
+import { Uint8ArrayList } from "uint8arraylist";
/**
* A function that takes an asynchronous iterable as a source, and returns a
@@ -1008,36 +1009,28 @@ export function transformJoinEnvelopes(): AsyncIterableTransform<
export function transformSplitEnvelope(
readMaxBytes: number,
): AsyncIterableTransform<Uint8Array, EnvelopedMessage> {
- // append chunk to buffer, returning updated buffer
- function append(buffer: Uint8Array, chunk: Uint8Array): Uint8Array {
- const n = new Uint8Array(buffer.byteLength + chunk.byteLength);
- n.set(buffer);
- n.set(chunk, buffer.length);
- return n;
- }
-
// tuple 0: envelope, or undefined if incomplete
// tuple 1: remainder of the buffer
function shiftEnvelope(
- buffer: Uint8Array,
+ buffer: Uint8ArrayList,
header: { length: number; flags: number },
- ): [EnvelopedMessage | undefined, Uint8Array] {
+ ): EnvelopedMessage | undefined {
if (buffer.byteLength < 5 + header.length) {
- return [undefined, buffer];
+ return undefined;
}
- return [
- { flags: header.flags, data: buffer.subarray(5, 5 + header.length) },
- buffer.subarray(5 + header.length),
- ];
+ const data = buffer.subarray(5, 5 + header.length);
+ buffer.consume(5 + header.length);
+ return { flags: header.flags, data };
}
// undefined: header is incomplete
function peekHeader(
- buffer: Uint8Array,
+ bufferList: Uint8ArrayList,
): { length: number; flags: number } | undefined {
- if (buffer.byteLength < 5) {
+ if (bufferList.byteLength < 5) {
return undefined;
}
+ const buffer = bufferList.subarray(0, 5);
const view = new DataView(
buffer.buffer,
buffer.byteOffset,
@@ -1049,17 +1042,16 @@ export function transformSplitEnvelope(
}
return async function* (iterable): AsyncIterable<EnvelopedMessage> {
- let buffer = new Uint8Array(0);
+ const buffer = new Uint8ArrayList();
for await (const chunk of iterable) {
- buffer = append(buffer, chunk);
+ buffer.append(chunk);
for (;;) {
const header = peekHeader(buffer);
if (!header) {
break;
}
assertReadMaxBytes(readMaxBytes, header.length, true);
- let env: EnvelopedMessage | undefined;
- [env, buffer] = shiftEnvelope(buffer, header);
+ const env = shiftEnvelope(buffer, header);
if (!env) {
break;
}
I implemented something using ArrayBuffer#resize here: https://github.com/connectrpc/connect-es/compare/main...tmm1:connect-es:tmm1/arraybuffer
This is affecting a project I am working on significantly. An opt-in to resizable buffers would be appreciated. I tested out the solution by @tmm1, a positive impact was quite noticable.
I gave resizable ArrayBuffer a shot. It performs really well, but we can get similar results with optimized allocations (see https://github.com/connectrpc/connect-es/pull/1530), and don't need to maintain two code paths (for environments with resizable support and for environments without).