workerd icon indicating copy to clipboard operation
workerd copied to clipboard

🐛 Bug Report — Runtime APIs: piping `ReadableStream` to an `IdentityTransformStream` results in uncaught `TypeError: This WritableStream has been closed.`

Open mrbbot opened this issue 2 years ago • 4 comments

Hey! 👋 With the following workerd configuration...

using Workerd = import "/workerd/workerd.capnp";

const config :Workerd.Config = (
  services = [
    ( name = "main", worker = .worker ),
  ],
  sockets = [
    ( name = "http", address = "*:8080", http = (), service = "main" ),
  ]
);

const worker :Workerd.Worker = (
  modules = [
    ( name = "index.mjs", esModule = embed "index.mjs" )
  ],
  compatibilityDate = "2023-08-01",
);
// index.mjs
export default {
  async fetch(request, env, ctx) {
    const readable = new ReadableStream({
      pull(controller) {
        controller.enqueue(new TextEncoder().encode("abc"));
        controller.close();
      }
    });

    const identity = new IdentityTransformStream();
    ctx.waitUntil(readable.pipeTo(identity.writable));

    return new Response(identity.readable);
  }
}

...and running workerd serve config.capnp --verbose, logs Uncaught (in promise); exception = TypeError: This WritableStream has been closed. on request.

Finding the error string and adding KJ_DBGs to all those places shows the error is coming from:

https://github.com/cloudflare/workerd/blob/9f56120ba310eab2247e45a139816e51981c7f8d/src/workerd/api/streams/internal.c%2B%2B#L814

Replacing const readable = new ReadableStream(...); with...

const { readable, writable } = new TransformStream();
const writer = writable.getWriter();
void writer.write(encoder.encode("abc"));
void writer.close();

...also leads to the same error.

Replacing new IdentityTransformStream() with new TransformStream() fixes the issue.

mrbbot avatar Aug 08 '23 17:08 mrbbot

  // Create a DecompressionStream for the 'deflate' format
  const decompressionStream = new DecompressionStream('deflate');
  
  // Create a ReadableStream with the decrypted data
  const readableStream = new ReadableStream({
      start(controller) {
      controller.enqueue(new Uint8Array(decrypted));
      controller.close();
      },
  });


  // Pipe the decrypted data through the decompression stream
  return readableStream.pipeThrough(decompressionStream);

I am getting the same here after the return of the new Response

    const inflatedBody = await decrypt_r2(c, await file_contents.arrayBuffer());

    return new Response(inflatedBody, { 
        status: 200,
        headers: {
            'Content-Type': 'application/octet-stream',
            'Content-Disposition': `attachment; filename="${filename}"`
        }
    });
X [ERROR] Uncaught (async) TypeError: This WritableStream has been closed.
X [ERROR] Uncaught (in promise) TypeError: This WritableStream has been close

hoangprod avatar Aug 19 '23 18:08 hoangprod

Ditto on the CompressionStream side of things. As soon as I add .pipeThrough(new CompressionStream('gzip')) I start receiving the error described in this issue.

I think the issue might be broader than the title suggests... in my case I'm just piping a ReadableStream through a CompressionStream (no IdentityTransformStream involved), along these lines:

const eventStream = someReadableStream
  .pipeThrough(new TextEncoderStream())
  .pipeThrough(new CompressionStream('gzip')); // the error goes away if compression stream is not included

This is during local dev with "wrangler": "3.7.0" - have not checked prod yet.

marbemac avatar Sep 13 '23 20:09 marbemac

This problem has also been reported to me in zip.js. Here's a complete example below to reproduce the issue with the CompressionStream API.

export default {
  async fetch() {
    const { readable, writable } = new CompressionStream("deflate-raw");
    const helloWorldStream = new ReadableStream({
      start(controller) {
        controller.enqueue(new TextEncoder().encode("Hello, World!"));
        controller.close();
      }
    });

    helloWorldStream.pipeTo(writable);
    
    return new Response(readable, {
      headers: {
        "Content-Disposition": 'attachment; filename="file.raw"',
        "Content-Type": "application/octet-stream",
        "Cache-Control": "no-cache",
      }
    });
  }
};

It can be circumvented with a fake CompressionStream API defined as below.

globalThis.CompressionStream = class {
  constructor(format) {
    const compressionStream = new CompressionStream(format);
    const writer = compressionStream.writable.getWriter();
    return new TransformStream({
      async transform(chunk) {
        await writer.write(chunk);
      },
      async flush() {
        await writer.close();
      }
    });
  }
};

// App code below
// ...

gildas-lormeau avatar May 21 '24 18:05 gildas-lormeau

I ran into the same issue and pipe seems to cause the same problems as pipeThrough just with less frequency. This is an example of what I did as a workaround.

async function pipeThrough(r: ReadableStream, w: WritableStream) {
  const writer = w.getWriter()
  for await (const chunk of r) {
    writer.write(chunk)
  }
  writer.close()
}

const { readable, writable } = new TransformStream<string, string>()
const compressor = new CompressionStream('gzip')

const pipe = readable
  .pipeThrough(new TextEncoderStream())

pipeThrough(pipe, compressor.writable) // replacement for .pipeThrough(new CompressionStream('gzip'))

hhoughgg avatar Aug 12 '24 19:08 hhoughgg

👍 Any news on this? I recently encountered this problem, also while trying to use zip.js to zip some files in the Worker, like @gildas-lormeau reported. On my development machine, testing locally works. However, this bug is present when I deploy the instance to Cloudflare

tdnghia98 avatar Nov 20 '25 22:11 tdnghia98

I'm going to be able to come back around to this very shortly (within a couple of weeks). Will update once I hve something

jasnell avatar Nov 20 '25 22:11 jasnell