fix(sendStream): handle stream abortion
resolves #1045
Implements the fix proposed in #1045. Enables handling of aborted (endless) streams e.g. proxied SSE streams.
Note: This is my first contribution, feel free to suggest improvements of any kind 😊
Thank you for PR, looks like a nice fix.
Would you be able to add one unit test?
Codecov Report
All modified and coverable lines are covered by tests :white_check_mark:
:loudspeaker: Thoughts on this report? Let us know!
Thank you for PR, looks like a nice fix.
Would you be able to add one unit test?
Sure, where would you expect this unittest to be located - I mean in which *.test.ts file?
Hi @pi0
I came up with this test case. The only thing is I am not sure where to put it. It seems to me that no test file really fits :/ maybe utils.test.ts ?
it("can abort endless stream request", async () => {
let connectionClosed = false;
let streamCancelled = false;
app.use(
"/",
eventHandler(async (event) => {
event.node.res.setHeader(
"Content-Type",
"text/event-stream; charset=utf-8",
);
event.node.res.setHeader("Cache-Control", "no-cache");
event.node.res.setHeader("Connection", "keep-alive");
event.node.res.setHeader("X-Accel-Buffering", "no");
// Detect when client disconnects
event.node.res.on("close", () => {
connectionClosed = true;
});
// clean up the stream
let intervalId;
const encoder = new TextEncoder();
const stream = new ReadableStream({
start(controller) {
intervalId = setInterval(() => {
controller.enqueue(encoder.encode("data: ping...\n\n"));
}, 10);
},
// this is called when the stream has been cancelled trouhgh the AbortController signal in the sendStream function
// commenting out the part of abort signal in the sendStream function => this will not be called after the request is cancelled
cancel() {
streamCancelled = true;
// clean up the stream
clearInterval(intervalId);
},
});
return sendStream(event, stream);
}),
);
const res = await fetch(url + "/", {
method: "GET",
headers: {
Accept: "text/event-stream",
},
});
if (!res.body) {
throw new Error("No response body");
} else {
let messagesCounter = 0;
const abort = new AbortController();
res.body
.pipeTo(
new WritableStream({
write(chunk) {
messagesCounter += 1;
if (messagesCounter > 5) {
// abort reading of the stream after few chunks
// simulate client disconnect
abort.abort();
}
},
}),
{ signal: abort.signal },
)
// hanlde the error that is thrown when the stream is aborted
.catch((err) => {});
}
await new Promise((resolve) => {
setTimeout(() => {
resolve(true);
}, 500);
});
expect(res.status).toEqual(200);
expect(connectionClosed !== false).toBe(true);
expect(streamCancelled !== false).toBe(true);
});