reactive-streams-commons
reactive-streams-commons copied to clipboard
Gem 22) discussion
From @dsyer
Interesting. Do you mean Mono.fromCallable() (because when I looked at the source code it seemed to me that the Callable.call() only happens when there is a subscribe())? In general, what's a good way to assert or inspect statements like that? What did I misunderstand?
Gem 22 handles the case when there is a subscriber but it hasn't called request() yet. fromCallable runs immediately but defers the emission. This gem defers the execution because the front just emits only when requested, then that value triggers the execution of the callable
Gem 22 link: https://github.com/reactor/reactive-streams-commons/issues/21#issuecomment-222080500
Here's the code from Gem 22 (with a concrete supplier and a log instead of using sysout):
Callable<String> callable = () -> "foo";
Mono.just("irrelevant").log().map(unused -> {
try {
return callable.call();
} catch (Exception ex) {
throw Exceptions.bubble(ex); // or Exceptions.propagate(ex)
}
}).subscribe(log::info, Throwable::printStackTrace);
and here's the output:
10:06:25.504 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.util.ScalarSubscription@5442a311)
10:06:25.506 [main] INFO reactor.core.publisher.FluxLog - request(unbounded)
10:06:25.506 [main] INFO reactor.core.publisher.FluxLog - onNext(irrelevant)
10:06:25.506 [main] INFO com.example.MonoFeaturesTests - foo
10:06:25.506 [main] INFO reactor.core.publisher.FluxLog - onComplete()
Here's the vanilla fromCallable() (as I understood Gem 22 this should be different)
Mono.fromCallable(callable).log().subscribe(log::info, Throwable::printStackTrace);
and the output:
10:05:13.317 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.subscriber.DeferredScalarSubscriber@1794d431)
10:05:13.318 [main] INFO reactor.core.publisher.FluxLog - request(unbounded)
10:05:13.318 [main] INFO reactor.core.publisher.FluxLog - onNext(foo)
10:05:13.318 [main] INFO com.example.MonoFeaturesTests - foo
10:05:13.318 [main] INFO reactor.core.publisher.FluxLog - onComplete()
What's the difference?
Updated the gem to show the effect if the request is actually delayed.
I see, thanks.