reactor-core icon indicating copy to clipboard operation
reactor-core copied to clipboard

Need an alternative to mergeComparing that can compare 2 statically-prioritized infinite sources

Open osi opened this issue 4 years ago • 2 comments

Expected Behavior

For asynchronous sources, when there is demand from downstream, mergeComparing compares the mostly recently emitted value from all of its sources, and then sends that value downstream. If a source hasn't yet emitted an element, it is effectively skipped.

Actual Behavior

mergeComparing does not emit downstream until there is a value from all of its sources. If there are two infinite sources that each have a fixed priority for their values, it will never consume from the lower-priority source.

Steps to Reproduce

(apologies for the larger times; this seems to mirror how the existing async test is written as withVirtualTime doesn't play nicely with this operator)

    @Test
    void shouldEmitAsyncValuesAsTheyArrive() {
        StepVerifier.create(Flux.defer(() -> {
                        var catsAreBetterThanDogs = Flux.just(
                                                            "pickles", // 700
                                                            "leela", // 1400
                                                            "girl", // 2100
                                                            "meatloaf", // 2800
                                                            "henry" // 3500
                                                        )
                                                        .delayElements(Duration.ofMillis(700))
                                                        .map(s -> Tuples.of(s, 0));

                        var poorDogs = Flux.just(
                                               "spot", // 300
                                               "barley", // 600
                                               "goodboy", // 900
                                               "sammy", // 1200
                                               "doug" // 1500
                                           )
                                           .delayElements(Duration.ofMillis(300))
                                           .map(s -> Tuples.of(s, 1));

                        return Flux.mergeComparing(Comparator.comparing(Tuple2::getT2),
                                                   catsAreBetterThanDogs,
                                                   poorDogs)
                                   .map(Tuple2::getT1);
                    }))
                    .expectNext("spot")
                    .expectNext("barley")
                    .expectNext("pickles")
                    .expectNext("goodboy")
                    .expectNext("sammy")
                    .expectNext("leela")
                    .expectNext("doug")
                    .expectNext("girl")
                    .expectNext("meatloaf")
                    .expectNext("henry")
                    .expectComplete()
                    .verify(Duration.ofSeconds(10));

    }

Possible Solution

https://github.com/reactor/reactor-core/blob/3cda8764f66c3785f9d2d59cd673bf8370fcbb84/reactor-core/src/main/java/reactor/core/publisher/FluxMergeComparing.java#L288 nonEmpty should be > 0

Your Environment

  • Reactor version(s) used: 3.4.10

osi avatar Oct 27 '21 17:10 osi

@osi I'm trying to use this operator too, I'm currently using akka stream's mergePrioritized

which gives you :

spot
barley
goodboy
sammy
doug
pickles
leela
girl
meatloaf
henry

with current reactor:

pickles
spot
barley
goodboy
sammy
leela
doug
girl
meatloaf
henry

And akka's mergeSorted has the same behavior as reactor.

He-Pin avatar Nov 02 '21 09:11 He-Pin

yeah I don't think we can make that change, it would break the behavior of the operator (same as in that akka-streams issue)... merge + comparison + infinite stream is a puzzler.

I currently don't have the time to work on a new operator that would fit that bill. Maybe if the operator's full behavior can be more formally specified (including not only the simple case of 2 sources, but extending to N sources with downstream backpressure...)

simonbasle avatar Nov 09 '21 17:11 simonbasle