FluxZip fails to deliver error from concurrently running sources
For a flux created by zipping two Mono MonoA and MonoB, followed by collectList(),
Flux.zip(MonoA, MonoB).collectList()
If MonoB emit error, the collectList() operator should emit the error instead of return a empty list.
Expected Behavior
Flux.collectList() should emit the error emitted by MonoB.
Actual Behavior
Sometime Flux.collectList() will return empty list instead of emit the error.
Steps to Reproduce
Here is a unit test to reproduce the issue
@Test
public void bug_Test() {
final var numItem = 1000000;
var fluxToTest = Flux.range(1, numItem)
.flatMap(ignored -> {
var mono1 = Mono.empty()
.publishOn(PARALLEL_SCHEDULER) // would pass the test if not run in parallel scheduler
.map(Optional::of)
.defaultIfEmpty(Optional.empty());
var mono2 = Mono.error(new NullPointerException())
.map(Optional::of)
.defaultIfEmpty(Optional.empty());
return Flux.zip(mono1, mono2).collectList().onErrorResume(e -> Mono.empty());
})
// expect upstream will emit error signal only
.flatMap(evt ->
Mono.error(new RuntimeException("Unexpected empty list return by collectList of size %s".formatted(evt.size())))
);
StepVerifier.create(fluxToTest).expectNextCount(0).verifyComplete();
}
The above test always failed with empty list emitted in the last flatMap() block. For expect behavior, the onErrorResume() operator would replace all error signal into Mono.empty and no data would be processed by the last flatMap() block.
Possible Solution
Your Environment
- Reactor version(s) used: 3.6.9
- JVM version (
java -version): java 21.0.5 2024-10-15 LTS Java(TM) SE Runtime Environment (build 21.0.5+9-LTS-239) Java HotSpot(TM) 64-Bit Server VM (build 21.0.5+9-LTS-239, mixed mode, sharing) - OS and version (eg
uname -a): MSYS_NT-10.0-14393 wh-1eab6196ab 3.4.10-2e2ef940.x86_64 2024-07-09 21:35 UTC x86_64 Msys
Hey, thanks for the report. I am wondering though - aren't you explicitly swallowing the error with .onErrorResume(e -> Mono.empty()) that follows after the .collectList() operator?
I expected the collectList() operator always emit the error. Thus, the onErrorResume() call always return empty flux.
I don’t expect the last map flatMap(evt -> ...) block will be triggered . However, I found that the collectList() call will return an empty list occasionally… that will trigger the last map call. That is not the expected behavior.
[EDIT] I misinterpreted the sequence of events in my initial evaluation. The below is not really what's going on since the delay is applied to a successful termination of a Mono that emits an item, not to the one that terminates with an error.
[ORIGINAL] Hmmm, I think your test is rather convoluted and makes it difficult to infer what's actually going on. Let me try to explain:
- The fact you needed to publish the error on the parallel scheduler because otherwise the test would pass indicates there is a timing issue in your code.
- Because you delay the error, the
Mono.zipoperator terminates oncemono1completes with just theonComplete()signal and cancelsmono2. - Due to the above, the
collectListhas no chance of observing an error, it just gets the completion signal from thezipoperator. - As the specification of
collectListstates, a completion signal from upstream gets transformed into an empty list downstream signal.
With the above explanation, I'm closing the report as invalid. Hope this explanation helps you resolve the issue you are facing.
Thanks for your updated. For 2. Would you please tell me why Flux.zip operator terminates once mono1 completed? As in the code, mono1 should emit a data Optional.empty(). Should Flux.zip operator wait signal from mono2 (either data or error) Why should the Flux.zip operator cancel mono2 in this case? Thanks!
My apologies, I think I have confused something there. Your questions are very helpful and I do think you might have in fact discovered a bug that's a race condition uncovered by these events:
- First
Monocompletes with a genuine signal on Thread1 - Second
Monoerrors on Thread2.
The situation happens to be that both Threads are draining the results at the same time and break the contract. I will investigate further.
I will also update my previous comment to reflect the reality (edit history can be viewed) as I made an error to assume the error gets delayed. The genuine signal is actually delayed and that is triggering this condition.
Just for the record - the exclusivity of drain looks correct, so not both threads are draining at the same time. My observation came from the natural concurrency in Flux.flatMap and two different Flux.zip instances were draining at the same time but were completely unrelated.
The actual issue is that both threads attempt to complete the FluxZip operator and the problematic sequence seems to be the following:
- T1:
mono1delivers an actual signal, starts the drain procedure. It doesn't notice any error so continues looking at other results. - T2:
mono2delivers the error, the error is set in the zip operator, but it fails to gain exclusive access to the drain procedure as T1 is executing it. - T1:
mono1code path delivers the successful signal, while not noticing the error that was stored.
There should be some handling that would drop one of the signals in case the other Thread is already handling termination.