workers-sdk icon indicating copy to clipboard operation
workers-sdk copied to clipboard

Durable Object RPC ReadableStream cancel logs “Network connection lost” and later “IoContext”

Open schickling opened this issue 2 months ago • 2 comments

Durable Object RPC ReadableStream cancel logs “Network connection lost” and later “IoContext”

Summary

  • Canceling a ReadableStream returned from a Durable Object (via DO RPC) and proxied by a Worker results in Wrangler dev logs showing “Network connection lost” almost immediately, followed later (~1 minute) by an “IoContext” message — despite the stream’s cancel() being invoked in both the Worker and the DO.

Environment

  • Wrangler: 4.44.0 (devDependency in repro)
  • Node: v24.10.0
  • OS: macOS Darwin 15.6
  • Bun: 1.3.0 (for completeness)

Minimal reproduction

  • Repo: https://github.com/IMax153/do-stream-cancel-repro
  • Setup:
    • pnpm install
    • pnpm dev
  • In a second terminal:
    • curl -X POST localhost:8787/rpc
    • After a few seconds, Ctrl+C to cancel the response stream

Expected

  • Canceling the proxied ReadableStream cleanly tears down the stream without transport errors.
  • DO source cancel() is called and shutdown is graceful, no “Network connection lost” or delayed “IoContext” error logs.

Actual

  • “Network connection lost” appears quickly after Ctrl+C on the client.
  • After ~1 minute, an “IoContext” error line appears.
  • Screenshot in repro: assets/images/error.png
    • Raw URL: https://raw.githubusercontent.com/IMax153/do-stream-cancel-repro/refs/heads/main/assets/images/error.png

Relevant code (src/index.ts)

  • DO returns a stream and logs on cancel:
return new ReadableStream({
  async start(controller) {
    for await (const value of self.dataSource) {
      controller.enqueue(new TextEncoder().encode(String(value)))
    }
  },
  cancel() {
    console.log('CANCELLED')
  },
})
  • Worker forwards the stream and cancels on client abort:
const stream = await stub.rpc()
request.signal.addEventListener('abort', () => {
  console.log('CLIENT ABORTED REQUEST')
  stream.cancel()
})
return new Response(stream, { headers: { 'Content-Type': 'application/octet-stream' } })

Configuration (wrangler.jsonc)

{
  "compatibility_date": "2025-10-22",
  "compatibility_flags": [
    "enable_request_signal",
    "nodejs_compat"
  ],
  "durable_objects": {
    "bindings": [
      { "class_name": "MyDurableObject", "name": "MY_DURABLE_OBJECT" }
    ]
  }
}

Notes

  • The DO’s cancel() reliably logs (“CANCELLED”), confirming cancellation propagates across the DO RPC boundary.
  • If these logs are expected at a lower level (e.g. transport closing) it’d be helpful to clarify whether this should be surfaced as an error or as a benign cancellation signal; as-is, it looks like an actual network fault during normal cancellation.
  • Happy to run additional diagnostics (e.g. specific logging flags) if it helps pinpoint whether this is a Miniflare/workerd/wrangler presentation issue vs. runtime behavior.

Thanks!

schickling avatar Oct 23 '25 10:10 schickling

I can also see this error with your reproduction! Thank you for providing it. I also stripped out Wrangler and I could not produce the error with a workerd-only Worker...

config.capnp

using Workerd = import "/workerd/workerd.capnp";

const config :Workerd.Config = (
  services = [
    (name = "main", worker = .mainWorker),
  ],

  sockets = [
    # Serve HTTP on port 8080
    ( name = "http",
      address = "*:8080",
      http = (),
      service = "main"
    ),
  ]
);

const mainWorker :Workerd.Worker = (
  modules = [
    (name = "index.js", esModule = embed "src/index.js"),
  ],
  compatibilityDate = "2025-10-11",
  compatibilityFlags = ["enable_request_signal", "nodejs_compat"],

  bindings = [
    (
      name = "MY_DURABLE_OBJECT",
      durableObjectNamespace = (className = "MyDurableObject"),
    ),
  ],

  durableObjectNamespaces = [
    (
      className = "MyDurableObject",
      uniqueKey = "v1-MyDurableObject",
    ),
  ],

  durableObjectStorage = (
    inMemory = void,
  ),
);

src/index.js

import { DurableObject } from "cloudflare:workers";

async function* dataSource() {
  let counter = 0;
  while (true) {
    yield counter++;
    await new Promise((resolve) => setTimeout(resolve, 1_000));
  }
}

export class MyDurableObject extends DurableObject {
  dataSource;

  constructor(ctx, env) {
    super(ctx, env);
    this.dataSource = dataSource();
  }

  async rpc() {
    const self = this;
    return new ReadableStream({
      async start(controller) {
        for await (const value of self.dataSource) {
          controller.enqueue(new TextEncoder().encode(String(value)));
        }
      },
      cancel() {
        console.log("CANCELLED");
      },
    });
  }
}

export default {
  async fetch(request, env, _ctx) {
    const url = new URL(request.url);

    if (request.method === "POST" && url.pathname === "/rpc") {
      const stub = env.MY_DURABLE_OBJECT.getByName("foo");

      const stream = await stub.rpc();

      request.signal.addEventListener("abort", () => {
        console.log("CLIENT ABORTED REQUEST");
        stream.cancel();
      });

      const headers = new Headers({
        "Content-Type": "application/octet-stream",
      });

      return new Response(stream, { headers });
    }
    return new Response("Not found", { status: 404 });
  },
};
pnpm workerd serve config.capnp

petebacondarwin avatar Oct 27 '25 11:10 petebacondarwin

Thanks a lot for looking into this @petebacondarwin @lrapoport-cf - is there any update on this by any chance?

schickling avatar Nov 30 '25 13:11 schickling