[Suggestion] Create an operator that merges multiple ordered flux's into a single flow with optional fields for flux's with gaps in there keys
Combine a Flux.zip-akin operator with a key-selecting variant of Flux.mergeComparing for publishers that should be merged based on keys, for both finite and unbounded sources, of any combination in length.
Motivation
I use Reactor everyday in my data pipeline work, to pretty great success. The lazy operators are amazing at handling complex merge operations across many distinct sources. One of the things I run into however is the case when I am trying to fan-in multiple sources of data that have different lengths. and mismatched (but ordered) keys.
Example use-case
An example of this would be merging in 4 different JSON arrays, where a "match-key" would be missing from some of the sets, or that some of the sets have totally different lengths, and would short circuit early.
I have used Flux.groupBy in the past, but that doesn't work in a unbounded Flux case
I tend to create a custom interleave for these situations, but a generic solution would be incredibly helpful.
Desired solution
An example signature for this kind of operator that I have experimented with:
/**
* This operator merges 4 different flux's together into a single flux based on matching keys.
* In the case of a source either not having a matched key, or ending early, an empty optional is returned.
* The Flux's do not have to be the same length, and may have different(but ordered) keys
* <br>
* Each source is read until their end.
* It's assumed that all the sources are already ordered, and that K is comparable
* @param <K> the key type; Required to be comparable. The smallest value is picked to combine
* @param <T1> type of the value from source1
* @param <T2> type of the value from source2
* @param <T3> type of the value from source3
* @param <T4> type of the value from source4
* @param source1 The first Publisher source to combine values from
* @param source2 The second Publisher source to combine values from
* @param source3 The third Publisher source to combine values from
* @param source4 The forth Publisher source to combine values from
* @param prefetch the minimum size of the internal queue per flux
* @return a flux based on the produced combinations
*/
public static <K extends Comparable<? super K>, T1, T2, T3, T4>
Flux<Tuple5<K, Optional<T1>, Optional<T2>, Optional<T3>, Optional<T4>>>
zipOnKeyOptional(Flux<? extends Map.Entry<K,T1>> source1,
Flux<? extends Map.Entry<K,T2>> source2,
Flux<? extends Map.Entry<K,T3>> source3,
Flux<? extends Map.Entry<K,T4>> source4, int prefetch);
Desired output
---
title: s
---
stateDiagram-v2
sourceOne --> Combiner
sourceTwo --> Combiner
sourceThree --> Combiner
sourceFour --> Combiner
state sourceOne {
s11: (1,1)
s12: (2,2)
s13: (3,3)
s14: (4,4)
s15: (5,5)
[*] --> s11
s11 --> s12
s12 --> s13
s13 --> s14
s14 --> s15
s15 --> [*]
}
state sourceTwo {
[*] --> [*]
}
state sourceThree {
s31: (1,1)
s32: (2,2)
s33: (4,4)
[*] --> s31
s31 --> s32
s32 --> s33
s33 --> [*]
}
state sourceFour {
s41: (1,1)
s42: (3,3)
s43: (4,4)
[*] --> s41
s41 --> s42
s42 --> s43
s43 --> [*]
}
state Combiner {
sc1: 1 [1, null, 1, 1]
sc2: 2 [2, null, 2, null]
sc3: 3 [3, null, null, 3]
sc4: 4 [4, null, 4, 4]
sc5: 5 [5, null, null, null]
[*] --> sc1
sc1 --> sc2
sc2 --> sc3
sc3 --> sc4
sc4 --> sc5
sc5 --> [*]
}
Test Case
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import reactor.util.function.Tuple5;
import reactor.util.function.Tuples;
import java.util.Map;
import java.util.Optional;
import static java.util.Map.entry;
import static java.util.Optional.of;
import static java.util.Optional.empty;
public class ReactiveUtilsTest {
@Test
void testZipOnKeyOptional() {
Flux<Map.Entry<Integer, Integer>> fluxOne = Flux.range(1,5).map(i -> entry(i,i));
Flux<Map.Entry<Integer, Integer>> fluxTwo = Flux.empty();
Flux<Map.Entry<Integer, Integer>> fluxThree = Flux.just(entry(1,1), entry(2,2), entry(4,4));
Flux<Map.Entry<Integer, Integer>> fluxFour = Flux.just(entry(1,1), entry(3,3), entry(4,4));
Flux<Tuple5<Integer, Optional<Integer>, Optional<Integer>, Optional<Integer>, Optional<Integer>>>
actual = Flux.zipOnKeyOptional(fluxOne, fluxTwo, fluxThree, fluxFour, 4);
StepVerifier.create(actual)
.expectNext(Tuples.of(1, of(1), empty(), of(1), of(1)))
.expectNext(Tuples.of(2, of(2), empty(), of(2), empty()))
.expectNext(Tuples.of(3, of(3), empty(), empty(), of(3)))
.expectNext(Tuples.of(4, of(4), empty(), of(4), of(4)))
.expectNext(Tuples.of(5, of(5), empty(), empty(), empty()))
.verifyComplete();
}
}
Considered alternatives
Flux.groupBydoesn't work in unbounded / infinite publisher situations.- groupedFlux's also can't be joined in a structured-concurrency kind of way, like
Mono.zip - I typically implement these functions by having:
- Having each flux be mapped to a marker interface that allows me to apply them to a POJO builder
- Merging the Flux's that now are cast to the marker interface with an operator like
Flux.mergeComparingDelayError(...) - Use
Flux.windowUntilChangedto group the entities flatMapwithreduceWithaccumulator to build the tuple out- the mapped object is aligned key wise, and has sane default
Optional.empty()for unmatched fields
Hi, @vibbix!
Thanks for sharing your interesting use case! Am I understanding correctly that you need a zip version which will be zipping until the longest source is done, while the other sources which has ended should return fallback or null value?
Cheers, Oleh
Wondering whether the existing API ideas can be used to achieve this result with the combinator variants, e.g.
zip(Function<? super Object[],? extends O> combinator, Publisher<? extends I>... sources)
when made configurable to wait for the last active source instead of terminating upon the first finishing one. The combinator would need to probably return a structure that contains both the output tuple and a decision object that says for particular source whether the value was consumed or should be reused for another zipping round.
The notion of zipping on key is quite limiting potentially.
@OlegDokuka Having a Flux.zipDelayComplete Being able to zip data sources with default fallback values for mismatched lengths would be very useful too. I tend to hack this together by using along the line of Flux.concat(source1, Flux.create(...)) that produces Map.Entry<Integer,Optional<T>>(Integer.MAX_VALUE,Optional.empty()) until the parent subscription cancels. It involves a lot of unboxing, but I built some static helpers to make it easier.
@chemicL
The combinator would need to probably return a structure that contains both the output tuple and a decision object that says for particular source whether the value was consumed or should be reused for another zipping round.
This would be great, and if there was something like this where I can request that the output tuple "replenish" the producer slot that I consumed, I would build these sort of functions on top of that.
in Considered Alternatives I built out this functionality today using Flux.mergeOrderComparing + flux.WindowUntilChanged with generic Map.Entry's & Marker interfaces, but we lose some type-safety and the concept of which "slot"/source publisher the result comes from.
In my case it's more about keeping a row-level/horizontal data structure in-tact. Making this a built-in Reactor operator would ensure the entire workflow is type-safe, that each source publisher is having it's backpressure dealt with correctly, and that buffer bloat is minimized.
The notion of zipping on key is quite limiting potentially.
It could be a great shortcut for this common use case. I tend to have different types in each zip'd source publisher, and even cases where I read different keys from the same object(although this is certainly a more unusual case). I have been workshopping different method signatures for a couple months, and this is the closest I got to a clean signature for a external implementation. Otherwise, each source flux would need a corresponding Function<? extends T1, ? extends K> key combinator.
@vibbix would you be so kind to provide some more test cases with some corner cases to help us better understand and consider a possible design? For one, I'm wondering if the keys can appear more than once.
If at all you'd be willing to provide the code for the implementation that works so far that would also be beneficial.
@chemicL I created a example of what I tend to use now here: vibbix/rx-experiments. The attached README.md has a description of my thought process in the design as well. I have been working on some examples on what a coordinator structure could look like to handle the incoming values as well.
For one, I'm wondering if the keys can appear more than once.
In my design, I assume that any publisher that has multiple of the same keys incoming have to be grouped prior.
@vibbix thank you. We appreciate your input. We are in the planning process currently and will get back when we have some priorities. Just to get a sense of work involved - are you interested in contributing something once we settle on design or would you expect the team or community to provide an implementation?