hono icon indicating copy to clipboard operation
hono copied to clipboard

Event Listener on Client Disconnect

Open HeyITGuyFixIt opened this issue 1 year ago • 34 comments

What is the feature you are proposing?

As mentioned in Discord, could we have the ability to create event listeners for client disconnect or abort events? I would expect to be able to use them like c.addListener('abort', () => { ...} and optionally c.on('abort', () => { ...}. Of course, we need the removeListener and off functions as well for completeness.

HeyITGuyFixIt avatar Dec 03 '23 12:12 HeyITGuyFixIt

Example usage:

c.stream((stream) => {
  return new Promise((resolve, reject) => {
    let callback = (data) => stream.write(data);
    request.addListener('data', callback);
    request.addListener('end', () => {
      request.removeListener('data', callback);
      resolve();
    });
    c.addListener('abort', () => {
      request.removeListener('data', callback);
      resolve();
    });
  })
});

HeyITGuyFixIt avatar Dec 03 '23 13:12 HeyITGuyFixIt

It may also be useful to have something like c.canceled as a Boolean for loops. Example (credit: sven@Discord):

app.get("/all", (c) => {
  return c.streamText(async (stream) => {
    await stream.writeln("[");
    for await (let value of all()) {
      if (c.canceled) break;
      await stream.writeln(`${JSON.stringify(value)},`);
    }
    await stream.writeln("{}]");
  });
});

HeyITGuyFixIt avatar Dec 03 '23 13:12 HeyITGuyFixIt

@sor4chi

Do you know how to handle user's cancellation. If you run the c.stream() or c.streamText() on Bun like the following, it will not be stopped.

app.get('/stream', (c) => {
  return c.streamText(async (stream) => {
    for (;;) {
      await stream.writeln('Hello')
      // Can I know the user's cancellation?
      await stream.sleep(1000)
    }
  })
})

yusukebe avatar Dec 03 '23 13:12 yusukebe

@HeyITGuyFixIt

What do you mean client disconnect or abort events? ? For example, it might not be enabled to handle closing the browser's tab. Or you use some JavaScript on a client-side?

yusukebe avatar Dec 03 '23 14:12 yusukebe

@yusukebe I am talking about the client's connection to the server. I would expect an event to fire when the client disconnects from the server. This would apply to more than streaming, as the client could disconnect early from any request that could take time to finish the response.

HeyITGuyFixIt avatar Dec 03 '23 14:12 HeyITGuyFixIt

@HeyITGuyFixIt

How can I reproduce "disconnect"?

yusukebe avatar Dec 03 '23 14:12 yusukebe

@yusukebe you can try using the streamText example above, run it, browse to it, and while it is still streaming, stop loading it in the browser, or if you are using curl, Ctrl+c to disconnect.

HeyITGuyFixIt avatar Dec 03 '23 14:12 HeyITGuyFixIt

@HeyITGuyFixIt

Thanks, I can reproduce it. However, I'm not certain if it's possible to detect an event like closing the browser.

@sor4chi, what are your thoughts on this?

yusukebe avatar Dec 03 '23 14:12 yusukebe

@yusukebe node's http.ClientRequest has the abort event that is supposed to fire when "the request has been aborted by the client."

HeyITGuyFixIt avatar Dec 03 '23 14:12 HeyITGuyFixIt

@HeyITGuyFixIt

I'm not sure if the abort event can handle browser closing.

I might try implementing a Node.js application that uses the abort event to see how it handles a "disconnect." However, if you have a good example, please share it.

yusukebe avatar Dec 03 '23 14:12 yusukebe

Hi, @HeyITGuyFixIt @yusukebe

It is not possible with the current implementation, but I think it can be done by taking the abort event from the controller when creating the writable stream and passing a handler from outside the StreamAPI. I'll try it later.

sor4chi avatar Dec 04 '23 10:12 sor4chi

I've created an example of a Node.js HTTP server that handles user disconnections. If you close the browser tab, the server stops. We should implement a similar feature in Hono.

import * as http from 'http'
import { Readable } from 'stream'

const server = http.createServer((req, res) => {
  res.writeHead(200, {
    'Content-Type': 'text/plain; charset=UTF-8',
    'Transfer-Encoding': 'chunked',
    'X-Content-Type-Options': 'nosniff'
  })

  const readable = new Readable({
    read() {}
  })

  const interval = setInterval(() => {
    const data = `${new Date().toISOString()}`
    console.log(data)
    readable.push(`${data}\n`)
  }, 1000)

  readable.pipe(res)

  req.on('close', () => {
    console.log('Connection is closed')
    clearInterval(interval)
    readable.destroy()
  })
})

const PORT = 3000
server.listen(PORT, () => {
  console.log(PORT)
})

yusukebe avatar Dec 05 '23 09:12 yusukebe

By the way, can we observe a response cancellation event? I have no idea to subscribe the event. I came up with getting AbortController from event, but it didn’t work well.

sor4chi avatar Dec 05 '23 09:12 sor4chi

@sor4chi

By the way, can we observe a response cancellation event?

I don't know :) So, I've made the Node.js example.

Which runtime do you try it?

yusukebe avatar Dec 05 '23 09:12 yusukebe

This works well on Node.js. But it does not work on Bun well because maybe it has this issue: https://github.com/oven-sh/bun/issues/6758

So, we have to test it on Node.js (or Deno).

app.get('/', async (c) => {
  let cancelationRequested = false

  const readable = new ReadableStream({
    start(controller) {
      const encoder = new TextEncoder()
      ;(async () => {
        for (;;) {
          if (cancelationRequested) {
            console.log(cancelationRequested)
            break
          }
          const input = encoder.encode('Hello\n')
          console.log('Hello')
          controller.enqueue(input)
          await new Promise((resolve) => setTimeout(resolve, 1000))
        }
        try {
          controller.close()
        } catch (e) {
          console.log(e)
        }
      })()
    },
    cancel(reason) {
      console.log('Stream canceled', reason)
      cancelationRequested = true
    },
  })

  return new Response(readable, {
    headers: {
      'content-type': 'text/plain; charset=UTF-8',
      'x-content-type-options': 'nosniff',
      'transfer-encoding': 'chunked',
    },
  })
})

yusukebe avatar Dec 05 '23 10:12 yusukebe

Cool, surely a stream could get it from cancel or abort. Can't we get abort from something other than stream as @ HeyITGuyFixIt shows in his code...?

sor4chi avatar Dec 05 '23 12:12 sor4chi

@sor4chi

Can't we get abort from something other than stream as @ HeyITGuyFixIt shows in his code...?

I don't know. Please investigate it.

yusukebe avatar Dec 05 '23 15:12 yusukebe

After investigating, it appears that we can handle cancellation events, such as closing a browser tab, using ReadableStream because it has a cancel method. However, WritableStream, which is used in c.stream(), lacks a method similar to the cancel method of ReadableStream. Therefore, it might not be possible to handle cancellation with c.stream().

@sor4chi

If you have the time, I would appreciate it if you also could investigate this.

yusukebe avatar Dec 09 '23 04:12 yusukebe

@yusukebe even this code did not work in cloudflare workers.

I have code locally that implements the abort event handler on c.stream, but I have a feeling that it might make a lot of difference depending on the runtime.

Should we proceed this?

sor4chi avatar Dec 09 '23 06:12 sor4chi

https://github.com/honojs/hono/assets/80559385/fd4e4de5-ef97-4340-a69a-df6fb6c25402

sor4chi avatar Dec 09 '23 06:12 sor4chi

@sor4chi

Cool!

I don't know whether to merge or not, but can you make a PR? We can discuss it there.

yusukebe avatar Dec 09 '23 06:12 yusukebe

Yes, of course. Just a very hacky writing style, so please hold on a moment to refactor it out.

sor4chi avatar Dec 09 '23 07:12 sor4chi

Confirmed that Hono's stream.onAbort does not work with Bun. The following does work to detect if a stream has been disconnected:

return streamSSE(c, async (stream) => {
   c.req.signal.addEventListener("abort", () => console.log('Stream closed'))
   
   await stream.writeSSE({....})
})

Abort event is now triggered when the client triggers EventSource.close()

satyavh avatar Jan 20 '24 15:01 satyavh

HI, @satyavh! I would like to know which version of Hono you are using. In hono <= 3.12.0, onAbort on streamSSE did not work, fixed since 3.12.1.

sor4chi avatar Jan 20 '24 16:01 sor4chi

I'm on Hono 3.12.1, does not work for me with Bun, but could be Bun related (I'm on Bun v1.0.24)

satyavh avatar Jan 20 '24 20:01 satyavh

I'm running into the same issue. I want to find a way to close the server-side connection when the client side calls the close method on the EventSource. I have this working with using Node and Express, but haven't been able to duplicate that with Bun and Hono. I am using Bun 1.0.24 and Hono 3.12.6.

Here is the relevant part of my code. Note that c.req.signal is deprecated and I think we are supposed to use c.req.raw.signal now.

app.get("/sse", (c: Context) => {
  return streamSSE(c, async (stream) => {
    // This should be invoked when the client calls close on the EventSource,
    // but it is not.
    c.req.raw.signal.addEventListener("abort", () => {
      console.log("got abort event");
      // TODO: How can the connection be closed?
    });

    let count = 0;
    while (count < 10) {
      count++;

      await stream.writeSSE({
        // TODO: Why does the next line trigger an error?
        // event: "count", // optional
        id: String(crypto.randomUUID()), // optional
        data: String(count), // TODO: Is this required to be a string?
      });
    }
  });
});

mvolkmann avatar Jan 21 '24 01:01 mvolkmann

@satyavh @mvolkmann

Here is the usage of onAbort that Hono is expecting. Cloud you try this?

app.get("/on-abort", (c) => {
  return streamSSE(c, async (stream) => {
    stream.onAbort(() => {
      console.log("aborted");
    });

    for (let i = 0; i < 10; i++) {
      await stream.write(`data: ${i}\n\n`);
      await stream.sleep(1000);
    }
  });
});

sor4chi avatar Jan 21 '24 07:01 sor4chi

@sor4chi I tried your suggestion. It did not work for me. The client calls eventSource.close(), but the stream.onAbort callback function is never executed. You can see my simple client code at https://github.com/mvolkmann/server-sent-events-examples/blob/main/bun-sse/public/index.html and my simple server code at https://github.com/mvolkmann/server-sent-events-examples/blob/main/bun-sse/src/server.ts.

mvolkmann avatar Jan 21 '24 13:01 mvolkmann

No stream.onAbort() -> {} is definitely never called.

@mvolkmann c.req.raw.signal.addEventListener("abort", () => {}) works for me, and is triggered when the client triggers EventSource.close()

    c.req.raw.signal.addEventListener("abort", () => {
      console.log("got abort event");
      // TODO: How can the connection be closed?
    });

You don't need to close the connection, because when this is triggered the connection is already closed by the client.

But note that this doesn't stop the actually processing of logic in the endpoint. In other words the loop keeps going forever if you don't stop it manually (in your case until count is 10). Hono only closes the stream connection from the server if you return from the streamSSE callback! The same is true if you trigger stream.close()

Now you can imagine the memory and server load issues this can generate if 1000s of clients connect to your SSE endpoint. All those loops/logic will potentially keep going forever, specially if you keep the connection open as per the documentation using stream.sleep()

So what you need to do is something like this

app.get('/sse', async (c) => {
  return streamSSE(c, async (stream) => {
    let isOpen = true
    
    c.req.raw.signal.addEventListener("abort", () => {
      // this will stop the loop and make sure the streamSSE callback resolves
      isOpen = false
    })
  
    // this loop makes sure the SSE stream stays open, as it makes sure the callback on line 2 never resolves (that is the weird part of Hono)
    while (isOpen) {
      const message = `It is ${new Date().toISOString()}`
      await stream.writeSSE({
        data: message,
        event: 'time-update',
        id: String(id++),
      })
      await stream.sleep(1000)
    }
  })
})

satyavh avatar Jan 21 '24 14:01 satyavh

[IMO] There are the possibility that Bun does not support ReadableStream({ cancel }) in the first place.

Similarly, it has been confirmed that onAbort does not work for cloudflare workers. Currently, the only runtime that has been confirmed to work is node.

sor4chi avatar Jan 22 '24 09:01 sor4chi