reactive-streams-commons icon indicating copy to clipboard operation
reactive-streams-commons copied to clipboard

Gem 22) discussion

Open akarnokd opened this issue 9 years ago • 5 comments

From @dsyer

Interesting. Do you mean Mono.fromCallable() (because when I looked at the source code it seemed to me that the Callable.call() only happens when there is a subscribe())? In general, what's a good way to assert or inspect statements like that? What did I misunderstand?

akarnokd avatar May 27 '16 08:05 akarnokd

Gem 22 handles the case when there is a subscriber but it hasn't called request() yet. fromCallable runs immediately but defers the emission. This gem defers the execution because the front just emits only when requested, then that value triggers the execution of the callable

akarnokd avatar May 27 '16 08:05 akarnokd

Gem 22 link: https://github.com/reactor/reactive-streams-commons/issues/21#issuecomment-222080500

dsyer avatar May 27 '16 09:05 dsyer

Here's the code from Gem 22 (with a concrete supplier and a log instead of using sysout):

        Callable<String> callable = () -> "foo";
        Mono.just("irrelevant").log().map(unused -> {
            try {
                return callable.call();
            } catch (Exception ex) {
                throw Exceptions.bubble(ex); // or Exceptions.propagate(ex)
            }
        }).subscribe(log::info, Throwable::printStackTrace);

and here's the output:

10:06:25.504 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.util.ScalarSubscription@5442a311)
10:06:25.506 [main] INFO reactor.core.publisher.FluxLog -  request(unbounded)
10:06:25.506 [main] INFO reactor.core.publisher.FluxLog -  onNext(irrelevant)
10:06:25.506 [main] INFO com.example.MonoFeaturesTests - foo
10:06:25.506 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

Here's the vanilla fromCallable() (as I understood Gem 22 this should be different)

        Mono.fromCallable(callable).log().subscribe(log::info, Throwable::printStackTrace);

and the output:

10:05:13.317 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.subscriber.DeferredScalarSubscriber@1794d431)
10:05:13.318 [main] INFO reactor.core.publisher.FluxLog -  request(unbounded)
10:05:13.318 [main] INFO reactor.core.publisher.FluxLog -  onNext(foo)
10:05:13.318 [main] INFO com.example.MonoFeaturesTests - foo
10:05:13.318 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

What's the difference?

dsyer avatar May 27 '16 09:05 dsyer

Updated the gem to show the effect if the request is actually delayed.

akarnokd avatar May 27 '16 09:05 akarnokd

I see, thanks.

dsyer avatar May 27 '16 09:05 dsyer