rest icon indicating copy to clipboard operation
rest copied to clipboard

SSE API issues with async operations

Open FroMage opened this issue 5 years ago • 2 comments

I don't think the spec is consistent with async programming or proper usage of CompletionStage when it comes to SSE handling. For example, this documentation of SseEventSink:

    /**
     * Send an outbound Server-sent event to this sink.
     * <p>
     * Event will be serialized and sent to the client.
     *
     * @param event event to be written.
     * @return completion stage that completes when the event has been sent. If there is a problem during sending of
     * an event, completion stage will be completed exceptionally.
     */
    public CompletionStage<?> send(OutboundSseEvent event);

This implies that the act of sending the even can be asynchronous (otherwise why bother returning a CompletionStage at all?), and completion can only be obtained by listening for the returned CompletionStage. However, the spec then shows this usage example:

@GET
@Path("eventStream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void eventStream(@Context SseEventSink eventSink, @Context Sse sse) {
  executor.execute(() -> {
     try (SseEventSink sink = eventSink) {
       eventSink.send(sse.newEvent("event1"));
       eventSink.send(sse.newEvent("event2"));
       eventSink.send(sse.newEvent("event3"));
     }
  });
 }

Which is not consistent with the documentation (and I hope, intent) of send because it starts sending the second events before the first is sent, and then closes the event sink before any event had a chance to be sent if they are asynchronous.

The proper example should be:

@GET
@Path("eventStream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void eventStream(@Context SseEventSink eventSink, @Context Sse sse) {
  executor.execute(() -> {
    eventSink.send(sse.newEvent("event1"))
     .thenCompose(v -> eventSink.send(sse.newEvent("event2")))
     .thenCompose(v -> eventSink.send(sse.newEvent("event3")))
     .whenComplete((v, err) -> eventSink.close());
  });
 }

Similarly, the docs for SseEventSink.close are not consistent with the docs of send, because they say:

     * Subsequent calls have no effect and are ignored. Once the {@link SseEventSink} is closed,
     * invoking any method other than this one and {@link #isClosed()} would result in
     * an {@link IllegalStateException} being thrown.

While I believe the proper way to handle this in send is to not throw but return a failed CompletionStage.

It's quite confusing that SseEventSink implements AutoCloseable but I suppose if you add .toCompletableFuture().get() to every call to send you can use the try-with-resource to close it. It's quite a trap though.

The most curious thing is that SseEventSink.close does not return a CompletionStage and does not mention async at all. If send can be asynchronous, then close has a completely unspecified behaviour: should it implicitly block? Can it close before queued events are sent? We can make it return a CompletionStage and either specify that it closes after all queued events are sent, or that it closes as soon as possible, ignoring any queued events, or simply specify that it should only get called after queued events have been awaited for.

SseBroadcaster has a similar gray area in broadcast which implies that the returned CompletionStage indicates completion of the individual send operations, but doesn't explain what happens to individual errors. It appears that at least one implementation will forward all errors to the listeners and not to the resulting CompletionStage, which is a bit counter-intuitive to users of CompletionStage, but at least the spec should clarify that.

FroMage avatar Sep 20 '19 13:09 FroMage

First, I agree that the Javadocs can be improved. Having said that, I think the original intent of this API was to separate queueing from network delivery --much like you would in any system based on event loops.

Calling send() accepts the message for queueing but does not guarantee network delivery. It follows that calling send() multiple times does guarantee ordering (a queue). The resulting CompletionStage's are used to monitor network delivery, but only in those cases where that is necessary. Finally, close() should flush the queue before returning.

Perhaps others can comment on this too. In any case, as stated above, Javadocs could be improved.

spericas avatar Sep 20 '19 13:09 spericas

Guaranteeing the queue ordering is a good thing.

As for close being synchronous instead of async, it would permit people to ignore the values returned from send but it would completely defeat any possibility of writing being async. After all, if send queue things without blocking, close would then be guaranteed to block and defeat the whole purpose of send being async in the first place, no?

FroMage avatar Sep 20 '19 14:09 FroMage