reactor-core icon indicating copy to clipboard operation
reactor-core copied to clipboard

Sync fusion support in reworked lazy mono stack

Open OlegDokuka opened this issue 3 years ago • 1 comments

Sync fusion can be a nice improvement for the reworked lazy mono for scenarios where the source is static data source

follow up for #3081

OlegDokuka avatar Aug 03 '22 15:08 OlegDokuka

Disposable d = Mono.fromFuture(future) // source is fn 
                                    .flatMap() // flat map can not fuse since the source is just push based observable
                                    .subscribe();
                                    // 5 sec wait
                                    d.dispose() // after some time

NO FUSION

subscriber call source.subscribe() source call subscriber.onSubscribe() subscriber call subscription.requestFusion() subscriber call subscription.request() subscription call subscriber.onNext() subscription call subscriber.onComplete()

Disposable d = Mono.fromFuture(future)
                                    .cache() // can be represented as a queue
                                    .flatMap() // flat map fuses with the source and treat it as a queue
                                    .subscribe();
                                    // 5 sec wait
                                    d.dispose() // after some time

ASYNC FUSION (makes no sense, just brings redundancy)

subscriber call source.subscribe() source call subscriber.onSubscribe() subscriber call subscription.requestFusion() subscriber call subscription.request() subscription call subscriber.onNext(null) subscriber call subscription.poll() // extra step here subscription call subscriber.onComplete()

Disposable d = Mono.fromCallable(future) // source is fn 
                                    .flatMap() // flat map fuses with the source is callable
                                    .subscribe();
                                    // 5 sec wait
                                    d.dispose() // after some time

Callable macro fusion (a.k.a MONO SYNC FUSION) (means value is generated synchronously via function call or scalar)

subscriber call source.subscribe() subscriber call source.call()

OlegDokuka avatar Oct 27 '22 13:10 OlegDokuka