🐛 Bug Report — Runtime APIs: piping `ReadableStream` to an `IdentityTransformStream` results in uncaught `TypeError: This WritableStream has been closed.`
Hey! 👋 With the following workerd configuration...
using Workerd = import "/workerd/workerd.capnp";
const config :Workerd.Config = (
services = [
( name = "main", worker = .worker ),
],
sockets = [
( name = "http", address = "*:8080", http = (), service = "main" ),
]
);
const worker :Workerd.Worker = (
modules = [
( name = "index.mjs", esModule = embed "index.mjs" )
],
compatibilityDate = "2023-08-01",
);
// index.mjs
export default {
async fetch(request, env, ctx) {
const readable = new ReadableStream({
pull(controller) {
controller.enqueue(new TextEncoder().encode("abc"));
controller.close();
}
});
const identity = new IdentityTransformStream();
ctx.waitUntil(readable.pipeTo(identity.writable));
return new Response(identity.readable);
}
}
...and running workerd serve config.capnp --verbose, logs Uncaught (in promise); exception = TypeError: This WritableStream has been closed. on request.
Finding the error string and adding KJ_DBGs to all those places shows the error is coming from:
https://github.com/cloudflare/workerd/blob/9f56120ba310eab2247e45a139816e51981c7f8d/src/workerd/api/streams/internal.c%2B%2B#L814
Replacing const readable = new ReadableStream(...); with...
const { readable, writable } = new TransformStream();
const writer = writable.getWriter();
void writer.write(encoder.encode("abc"));
void writer.close();
...also leads to the same error.
Replacing new IdentityTransformStream() with new TransformStream() fixes the issue.
// Create a DecompressionStream for the 'deflate' format
const decompressionStream = new DecompressionStream('deflate');
// Create a ReadableStream with the decrypted data
const readableStream = new ReadableStream({
start(controller) {
controller.enqueue(new Uint8Array(decrypted));
controller.close();
},
});
// Pipe the decrypted data through the decompression stream
return readableStream.pipeThrough(decompressionStream);
I am getting the same here after the return of the new Response
const inflatedBody = await decrypt_r2(c, await file_contents.arrayBuffer());
return new Response(inflatedBody, {
status: 200,
headers: {
'Content-Type': 'application/octet-stream',
'Content-Disposition': `attachment; filename="${filename}"`
}
});
X [ERROR] Uncaught (async) TypeError: This WritableStream has been closed.
X [ERROR] Uncaught (in promise) TypeError: This WritableStream has been close
Ditto on the CompressionStream side of things. As soon as I add .pipeThrough(new CompressionStream('gzip')) I start receiving the error described in this issue.
I think the issue might be broader than the title suggests... in my case I'm just piping a ReadableStream through a CompressionStream (no IdentityTransformStream involved), along these lines:
const eventStream = someReadableStream
.pipeThrough(new TextEncoderStream())
.pipeThrough(new CompressionStream('gzip')); // the error goes away if compression stream is not included
This is during local dev with "wrangler": "3.7.0" - have not checked prod yet.
This problem has also been reported to me in zip.js. Here's a complete example below to reproduce the issue with the CompressionStream API.
export default {
async fetch() {
const { readable, writable } = new CompressionStream("deflate-raw");
const helloWorldStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("Hello, World!"));
controller.close();
}
});
helloWorldStream.pipeTo(writable);
return new Response(readable, {
headers: {
"Content-Disposition": 'attachment; filename="file.raw"',
"Content-Type": "application/octet-stream",
"Cache-Control": "no-cache",
}
});
}
};
It can be circumvented with a fake CompressionStream API defined as below.
globalThis.CompressionStream = class {
constructor(format) {
const compressionStream = new CompressionStream(format);
const writer = compressionStream.writable.getWriter();
return new TransformStream({
async transform(chunk) {
await writer.write(chunk);
},
async flush() {
await writer.close();
}
});
}
};
// App code below
// ...
I ran into the same issue and pipe seems to cause the same problems as pipeThrough just with less frequency. This is an example of what I did as a workaround.
async function pipeThrough(r: ReadableStream, w: WritableStream) {
const writer = w.getWriter()
for await (const chunk of r) {
writer.write(chunk)
}
writer.close()
}
const { readable, writable } = new TransformStream<string, string>()
const compressor = new CompressionStream('gzip')
const pipe = readable
.pipeThrough(new TextEncoderStream())
pipeThrough(pipe, compressor.writable) // replacement for .pipeThrough(new CompressionStream('gzip'))
👍 Any news on this? I recently encountered this problem, also while trying to use zip.js to zip some files in the Worker, like @gildas-lormeau reported. On my development machine, testing locally works. However, this bug is present when I deploy the instance to Cloudflare
I'm going to be able to come back around to this very shortly (within a couple of weeks). Will update once I hve something