pocketbase icon indicating copy to clipboard operation
pocketbase copied to clipboard

SSE thread will die silently

Open joshmcculloch opened this issue 1 year ago • 2 comments

Currently any exception in the SSEClient/Eventloop will result in the the thead exiting. In my testing I see the http.ReadTimeout exception when the connection to the server is dropped. There is no indication to the subscriber that new events are no longer getting through.

I'm happy to help with the fix, but I'm looking for some guidence on how to push the event through to the calling code and how the situation should be resolved.

To patch this for my appication I have just caught the exception, slept, and allowed a new connection to be established. Perhaps this was always the intention as there is already a while loop that causes the reconnect. Link to my change

joshmcculloch avatar Jan 21 '24 07:01 joshmcculloch

Thank you for looking into this. Your solution looks fine for the situation where the client looses connection because of e.g. a temporary network issue.

However there is a second situation where the server loses the client due to e.g. an restart/update of the pocketbase server or server side timeout.

In this case the client would have to reissue the RealtimeService._submit_subscriptions() logic to the server so that the server gets again the actual subscriptions...

Currently you would end up in a kind of infinite reconnecting attempt loop if the server looses track of the client...

m29h avatar Jan 21 '24 10:01 m29h

Inspired by the code in original comment, here's a patch you can use without altering the library. It adds simple exponential backoff, but is otherwise functionally the same. This gives automatic reconnecting and resubscribing to all events, but there's no attempt to catch up on missed events.

Of course I would prefer to see this integrated into the library, but this may help for now.

"""Patch for pocketbase to handle httpx exceptions in the EventLoop, allowing for automatic reconnection."""

import time
from math import floor

import httpx


def _patched_run(self): 
    print('Using patched EventLoop.run') # remove me
    error = False
    backoff_time = 0
    while not self.kill:
        if error:
            backoff_time = floor(min(20, (backoff_time + 1) * 1.8))  # 1, 3, 6, 11, 20 seconds
            error = False
        else:
            backoff_time = 0
        if backoff_time:
            time.sleep(backoff_time)
        try:
            for event in self._events():
                if self.kill:
                    break
                if event.event in self.listeners:
                    self.listeners[event.event](event)
        except httpx.ReadError:
            error = True
        except httpx.ConnectError:
            error = True
        except httpx.TimeoutException:
            error = True
        except Exception:
            self.kill = True
        else:
            error = False


def patch_pocketbase_see():
    from pocketbase.services.utils.sse import EventLoop

    EventLoop.run = _patched_run

coldino avatar Jan 12 '25 13:01 coldino