hilla icon indicating copy to clipboard operation
hilla copied to clipboard

[push] There is no API for handling reconnect / resubscribe

Open Artur- opened this issue 3 years ago • 8 comments
trafficstars

Description of the bug

socket.io that is used to handle the websocket connection handles buffering of messages up until a certain timeout but if you are disconnected longer than that, or if you intentionally disconnect and reconnect, there needs to be an easy way to resubscribe to relevant fluxes. What you want to do to reconnect depends on the data you are subscribing to, e.g.

  • for a chat you might want to fetch messages send between the disconnect time and reconnect time + subscribe to new messages
  • for a chat you might alternatively want to subscribe to messages starting from the disconnect time if the flux has buffered the messages
  • for some kind of monitoring, you might want to fetch the current state and then subscribe to updates.

In many (most?) cases, the operation you want to do is the same you do when you first open a view or otherwise start a subscription.

Minimal reproducible example

Open a Flux connection, set Chrome to offline and back to online. Data emitted after this is not received in the browser.

Expected behavior

From a users point of view, you most of the time want the same end result regardless of if a certain view in an app has been open for the last 20 hours

Artur- avatar Apr 29 '22 17:04 Artur-

Use case cursor tracker

This is an example of initialization code in TS

    this.cursorId = await CursorTracker.join(); // Get an id from the server
    this.cursors = await CursorTracker.getCursors(this.cursorId); // Get all the current cursor positions
    const maxTimestamp = Math.max(...this.cursors.map((c) => c.timestamp));
    this.sub = CursorTracker.subscribe(this.cursorId, maxTimestamp).onNext((value) => { // Subscribe to all updates
      if (value.id !== this.cursorId) {
        this.cursors = [...this.cursors.filter((cursor) => cursor.id !== value.id), value];
      }
    });

What do you want to happen when if the underlying channel has been reconnected and you have no subscription and the server knows nothing about you? You want to execute the exact same code again.

Use case temperature plotting in a graph

Initialization:

this.values = await TemperatureEndpoint.getHistory();
const lastTimestamp = this.temperatures[0].at(-1).timestamp;
const subscriber = TemperatureEndpoint.subscribeToUpdates(lastTimestamp);
subscriber.onNext((value) => {
  this.values = [...this.values, value];
});

Initialization much as above with sensors instead of cursors. One difference in reconnect logic: You want to retain the existing values you have, fetch all the values after the last value received and then get updates. To accomplish this you can rewrite the code to always fetch between last known timestamp (default e.g. one hour ago) and current time. Then you again want to re-run the same code on reconnect.

** Use case simple chat**

A super simple chat does not keep anything in memory but only delivers messages to the current subscribers.

Initialization:

ChatEndpoint.join().onNext((msg) => {
    this.messages = [...this.messages, msg];
});

On reconnect you do exactly the same BUT you additionally want to clear the log or mark it with e.g. --- here potentially some messages were lost ---.

Artur- avatar May 03 '22 17:05 Artur-

Some potential solutions to this:

  1. Wrap the init + subscription code into a block that can be re-run. For instance:
keepAlive(() => {
  this.values = await TemperatureEndpoint.getHistory();
  const lastTimestamp = this.temperatures[0].at(-1).timestamp;
  const subscriber = TemperatureEndpoint.subscribeToUpdates(lastTimestamp);
  subscriber.onNext((value) => {
    this.values = [...this.values, value];
  });
});

When the network connection is re-established, the whole block would then be executed again.

  1. Support a provider as subscription method parameters in addition to the parameters themselves. For instance
const initializer = () => {
  this.values = await TemperatureEndpoint.getHistory();
  const lastTimestamp = this.temperatures[0].at(-1).timestamp;
  return { lastTimestamp };
}

const subscriber = TemperatureEndpoint.subscribeToUpdates(initializer);
subscriber.onNext((value) => {
  this.values = [...this.values, value];
});

When the network connection is re-established, the same subscription would then be re-activated by running first the initializer and then calling the endpoint with the result.

In all cases, you additionally want the option to listen to the events that the subscription is now inactive, i.e. cannot receive updates from the server. You might want to indicate this in the UI, e.g. in a monitoring chart saying that it currently does not work. When the subscription is active again you want to remove the text.

Artur- avatar May 04 '22 16:05 Artur-

Potential solution 3:

TemperatureEndpoint.connect(endpoint => {
  this.values = await endpoint.getHistory();
  const lastTimestamp = this.temperatures[0].at(-1).timestamp;
  const subscriber = endpoint.subscribeToUpdates(lastTimestamp);
  subscriber.onNext((value) => {
    this.values = [...this.values, value];
  });
});

There are two concepts: Connection and Subscription. The Connection represents the underlying server connection, the Subscription represents the connection to a server endpoint returning Flux. The block inside connect is re-run automatically when the server connection has been lost and re-established (unless you append a .withoutReconnect())

This way the connection is the logical place where you would add listeners for losing and re-establishing the server connection so you could update the UI.

Artur- avatar May 06 '22 06:05 Artur-

I think the requirements have changed a bit now when we have settled on using full-stack signals for synchronizing state. You should return a Flux on the server if you want to pass messages whereas you should return a signal instance if you want to synchronize a value. Both the cursor tracker example and the temperature monitor example look like synchronizing state rather than passing messages.

From that point of view, we would only need a new onDisconnect(callback: () => void) method in Subscription and then it would be up to application logic to determine whether it wants to try to call the server-side method again with suitable parameters to get a new subscription. The disconnect callback would only be called if the connection is disconnected for some external reason rather than the controlled cases based on calling cancel() or a server-side action that triggers onComplete or onError.

On special case to take into account is to somehow ensure that the callback is also run if you're offline when trying to open a subscription.

Legioth avatar Feb 14 '24 12:02 Legioth

For reference, this is what I think the cursor tracker example could look like with signals:

const cursors: ListSignal<Cursor> = CursorTracker.cursors();
const ownCursorSignal: ValueSignal<Cursor> = cursors.insertLast(currentCursorPosition, {connectionScoped: true}).signal;
effect(() => {
  const otherCursors: Cursor[] = cursors.value.filter((signal) => signal != ownCursorSignal).map((signal) => signal.value);
  console.log("Other cursors:", otherCursors);
});
element.addEventListener('mousemove', (event) => ownCursorSignal.value = createCursor(event));

The {connectionScoped: true} option instructs the server to automatically remove your cursor from the list when you are disconnected. (The entry gets inserted again if the browser tab is still open when you reconnect. You should explicitly remove the entry if you want to prevent this from happening.)

The temperature history would look like this:

const history: ListSignal<Temperature> = TemperatureEndpoint.getHistory();
effect(() => {
  const values: Temperature[] = history.items.value;
  console.log("Values:", values);
});

The client-side code for these simple examples are mostly similar. The true power from signals comes from the way the signal values are directly bound to React components and from the way the implementation on the server also doesn't have to care about method arguments for continuing from some specific state and so on.

Legioth avatar Feb 14 '24 13:02 Legioth

The core idea is to give the application developer a way of "manually" recovering from connectivity issues. More sophisticated "automatic" state management will eventually be handled through full-stack signals so the focus here is more about dealing with reconnects for a "stateless" stream of events.

We cannot assume that the Flux instance still exists when continuing since the user might have been offline for long enough to trigger a server-side timeout and cleanup. There might be opportunities to optimize for cases with a quick reconnect when the server-side state (i.e. the Flux instance) but that's out of scope - we will anyways also need a way of dealing with longer offline durations.

Legioth avatar Apr 23 '24 11:04 Legioth

It would make sense to make sure that all Atmosphere conditions are either exposed for the application logic in some way or handled automatically.

platosha avatar Apr 23 '24 11:04 platosha

Current handling starts here: https://github.com/vaadin/hilla/blob/fd5e8b1d9ba438de0bce7ce162ffdab9eb0bbc51/packages/ts/frontend/src/FluxConnection.ts#L109

Hilla handles these events: onClose, onError, onMessage, onOpen, onReopen. This means that some events defined by Atmosphere are not handled at all even though there might be fallbacks where it delegates to some other event handler if there's no handler for some specific event. The other observation is that e.g. the current onError handler is just logging the error to the console but not giving any way for the application logic to know that anything has happened.

Legioth avatar Apr 23 '24 11:04 Legioth