Document using `.subscribe` for `fire-and-forget` scenarios
Documentation Issue
Sometimes in various "Reactive" projects I see usage of such pattern as
Mono<Void> initiateSideEffects(T arg) {
thirdPartyService.sideEffectOperation(arg)
.subscribe();
yetAnotherService.sideEffectOperation(arg)
.subscribe();
return Mono.empty();
}
I could not find any mention in the Reference documentation regarding such usage of .subscribe(). All instances of .subscribe() in Reference documentation seem to refer to calls from imperative blocking code, to observe Publisher-s behavior.
How safe is such usage from the Reactive code though? What if GC gets to this Disposable and purges it, what happens to underlying Subscription and / or Publisher?
I've asked this question on SO, and the only answer I've got thus far seems to imply that as long as .subscribeOn is used on the Publisher with an appropriate Scheduler, all should be fine.
Can we please have this covered in the Reference documentation too?
Improvement Suggestion
Document "proper" (idiomatic) implementation of "fire-and-forget" scenario in Reactive code (closest analogy in imperative code would be to run a Thread with some Runnable and move on; or an @Async Spring annotation)
Document implication of calling .subscribe() and "forgetting" the returned Disposable, both in assembly-phase code and in subscription-phase code.
Additional context
SO ticket: https://stackoverflow.com/questions/75434622/is-it-safe-to-just-subscribe-to-publishers-in-the-assembly-phase-and-leave-it-at/75445943
Hi, @62mkv!
In general, you should chain all the calls since the actual downstream in your chain may want to control the lifecycle of all allocated streams. You should follow that rule unless you know that the subscription is something cheap or should not be canceled or managed by the downstream.
According to your suggestions - there is no concept of "idiomatic" fnf in reactive streams. Fnf call assumes no control over the running process (including any possibility of terminating it earlier). That says Reactive Streams does not fit well there since A - such a Mono should complete immediately upon termination or B - if it is not, it should be terminatable which means you may want to keep returned Disposable somewhere. Also, don't forget about errors and the supervising concept which means all of them to have to be handled somehow.
So, by writing to that point I'm not even sure such an FNF scenario should be added to the docs.
My suggestion is to change the underlying implementation to either have a non-reactive API or to make the response completed immediately.
Alternatively, the code you wrote should have a kind of dead-letter queue where all the errors can be delivered later, e.g.
final ErrorSubscriber errorSubs = ...
Mono<Void> initiateSideEffects(T arg) {
thirdPartyService.sideEffectOperation(arg)
.subscribe(errorSubs);
yetAnotherService.sideEffectOperation(arg)
.subscribe(errorSubs);
return Mono.empty();
}
where errorSubscriber can deliver exceptions somewhere (logs, another service, etc).
P.S. I highly suggest rethinking the API. Returning Mono.empty and keeping Mono contract makes no sense unless you know that there is another implementation which may have different Mono behavior, so you just keep the contract for that purpose
Hi @OlegDokuka thanks for your comments.
Let me be more specific: we use such patterns currently in situations where we want to do something not-too-critical, but potentially time-consuming, in a context where completion speed is more important than the actual outcome. Say, like notifying third party via WebClient call from a Kafka consumer. If we were to chain all the calls, even if one of them times out, this might have detrimental effect on the event consumption.
By your comments, I assume that if we make sure that every "discarded" subscription is wrapped in a catch-all .onErrorResume or similar handler, it should be relatively safe to use?
What about also applying .subscribeOn(Schedulers.boundedElastic()) - would that make it safer yet?
More to the point: I am not saying that this usage should be encouraged; rather I am trying to collect evidence suggesting otherwise, that I could then show to my co-workers and suggest that we probably should not do this. And a note in reference documentation would certainly be helpful in this respect.
Thanks again!
PS: concerning API: the code in my example could indeed be re-written using just void return type. And it has issues indeed, if called at inappropriate time, e.g. assembly phase. So probably it would be a tiny bit better if it were written like this instead:
Mono<Void> initiateSideEffects(T arg) {
return Mono.defer(() -> {
thirdPartyService.sideEffectOperation(arg)
.subscribe(errorSubs);
yetAnotherService.sideEffectOperation(arg)
.subscribe(errorSubs);
return Mono.empty();
});
}
with this code at least nothing happens before subscribe is called.