h3 icon indicating copy to clipboard operation
h3 copied to clipboard

fix(sendStream): handle stream abortion

Open JHolcman-T opened this issue 7 months ago • 4 comments

resolves #1045

Implements the fix proposed in #1045. Enables handling of aborted (endless) streams e.g. proxied SSE streams.

Note: This is my first contribution, feel free to suggest improvements of any kind 😊

JHolcman-T avatar May 03 '25 21:05 JHolcman-T

Thank you for PR, looks like a nice fix.

Would you be able to add one unit test?

pi0 avatar May 05 '25 10:05 pi0

Codecov Report

All modified and coverable lines are covered by tests :white_check_mark:

:loudspeaker: Thoughts on this report? Let us know!

codecov[bot] avatar May 05 '25 10:05 codecov[bot]

Thank you for PR, looks like a nice fix.

Would you be able to add one unit test?

Sure, where would you expect this unittest to be located - I mean in which *.test.ts file?

JHolcman-T avatar May 05 '25 11:05 JHolcman-T

Hi @pi0

I came up with this test case. The only thing is I am not sure where to put it. It seems to me that no test file really fits :/ maybe utils.test.ts ?

    it("can abort endless stream request", async () => {
      let connectionClosed = false;
      let streamCancelled = false;

      app.use(
        "/",
        eventHandler(async (event) => {
          event.node.res.setHeader(
            "Content-Type",
            "text/event-stream; charset=utf-8",
          );
          event.node.res.setHeader("Cache-Control", "no-cache");
          event.node.res.setHeader("Connection", "keep-alive");
          event.node.res.setHeader("X-Accel-Buffering", "no");

          // Detect when client disconnects
          event.node.res.on("close", () => {
            connectionClosed = true;
          });

          // clean up the stream
          let intervalId;
          const encoder = new TextEncoder();

          const stream = new ReadableStream({
            start(controller) {
              intervalId = setInterval(() => {
                controller.enqueue(encoder.encode("data: ping...\n\n"));
              }, 10);
            },
            // this is called when the stream has been cancelled trouhgh the AbortController signal in the sendStream function
            // commenting out the part of abort signal in the sendStream function => this will not be called after the request is cancelled
            cancel() {
              streamCancelled = true;
              // clean up the stream
              clearInterval(intervalId);
            },
          });

          return sendStream(event, stream);
        }),
      );

      const res = await fetch(url + "/", {
        method: "GET",
        headers: {
          Accept: "text/event-stream",
        },
      });

      if (!res.body) {
        throw new Error("No response body");
      } else {
        let messagesCounter = 0;
        const abort = new AbortController();
        res.body
          .pipeTo(
            new WritableStream({
              write(chunk) {
                messagesCounter += 1;
                if (messagesCounter > 5) {
                  // abort reading of the stream after few chunks
                  // simulate client disconnect
                  abort.abort();
                }
              },
            }),
            { signal: abort.signal },
          )
          // hanlde the error that is thrown when the stream is aborted
          .catch((err) => {});
      }
      await new Promise((resolve) => {
        setTimeout(() => {
          resolve(true);
        }, 500);
      });
      expect(res.status).toEqual(200);
      expect(connectionClosed !== false).toBe(true);
      expect(streamCancelled !== false).toBe(true);
    });

JHolcman-T avatar May 24 '25 10:05 JHolcman-T