rest
rest copied to clipboard
SSE API issues with async operations
I don't think the spec is consistent with async programming or proper usage of CompletionStage
when it comes to SSE handling. For example, this documentation of SseEventSink
:
/**
* Send an outbound Server-sent event to this sink.
* <p>
* Event will be serialized and sent to the client.
*
* @param event event to be written.
* @return completion stage that completes when the event has been sent. If there is a problem during sending of
* an event, completion stage will be completed exceptionally.
*/
public CompletionStage<?> send(OutboundSseEvent event);
This implies that the act of sending the even can be asynchronous (otherwise why bother returning a CompletionStage
at all?), and completion can only be obtained by listening for the returned CompletionStage
. However, the spec then shows this usage example:
@GET
@Path("eventStream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void eventStream(@Context SseEventSink eventSink, @Context Sse sse) {
executor.execute(() -> {
try (SseEventSink sink = eventSink) {
eventSink.send(sse.newEvent("event1"));
eventSink.send(sse.newEvent("event2"));
eventSink.send(sse.newEvent("event3"));
}
});
}
Which is not consistent with the documentation (and I hope, intent) of send
because it starts sending the second events before the first is sent, and then closes the event sink before any event had a chance to be sent if they are asynchronous.
The proper example should be:
@GET
@Path("eventStream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void eventStream(@Context SseEventSink eventSink, @Context Sse sse) {
executor.execute(() -> {
eventSink.send(sse.newEvent("event1"))
.thenCompose(v -> eventSink.send(sse.newEvent("event2")))
.thenCompose(v -> eventSink.send(sse.newEvent("event3")))
.whenComplete((v, err) -> eventSink.close());
});
}
Similarly, the docs for SseEventSink.close
are not consistent with the docs of send
, because they say:
* Subsequent calls have no effect and are ignored. Once the {@link SseEventSink} is closed,
* invoking any method other than this one and {@link #isClosed()} would result in
* an {@link IllegalStateException} being thrown.
While I believe the proper way to handle this in send
is to not throw but return a failed CompletionStage
.
It's quite confusing that SseEventSink
implements AutoCloseable
but I suppose if you add .toCompletableFuture().get()
to every call to send
you can use the try-with-resource
to close it. It's quite a trap though.
The most curious thing is that SseEventSink.close
does not return a CompletionStage
and does not mention async at all. If send
can be asynchronous, then close
has a completely unspecified behaviour: should it implicitly block? Can it close before queued events are sent? We can make it return a CompletionStage
and either specify that it closes after all queued events are sent, or that it closes as soon as possible, ignoring any queued events, or simply specify that it should only get called after queued events have been awaited for.
SseBroadcaster
has a similar gray area in broadcast
which implies that the returned CompletionStage
indicates completion of the individual send
operations, but doesn't explain what happens to individual errors. It appears that at least one implementation will forward all errors to the listeners and not to the resulting CompletionStage
, which is a bit counter-intuitive to users of CompletionStage
, but at least the spec should clarify that.
First, I agree that the Javadocs can be improved. Having said that, I think the original intent of this API was to separate queueing from network delivery --much like you would in any system based on event loops.
Calling send()
accepts the message for queueing but does not guarantee network delivery. It follows that calling send()
multiple times does guarantee ordering (a queue). The resulting CompletionStage
's are used to monitor network delivery, but only in those cases where that is necessary. Finally, close()
should flush the queue before returning.
Perhaps others can comment on this too. In any case, as stated above, Javadocs could be improved.
Guaranteeing the queue ordering is a good thing.
As for close
being synchronous instead of async, it would permit people to ignore the values returned from send
but it would completely defeat any possibility of writing being async. After all, if send
queue things without blocking, close
would then be guaranteed to block and defeat the whole purpose of send
being async in the first place, no?