reactor-core
reactor-core copied to clipboard
Flux.delayElements() and parallel() cause hang when an exception has been thrown in flatMap()
Flux.delayElements() and parallel() cause hang when an exception has been thrown in flatMap(). If I remove either delayElements() or parallel(), block() doesn't cause hang.
Expected Behavior
It doesn't cause hang.
Actual Behavior
It causes hang.
Steps to Reproduce
@Test
void delayElementsParallelFlatMapAndThenOrderedWhenFlatMapThrowsExceptionCauseHang() {
Set<String> iterable = new LinkedHashSet<>(Arrays.asList("apple", "banana", "orange"));
List<String> collected = Flux.fromIterable(iterable).log()
.delayElements(Duration.ofSeconds(1)).log()
.parallel().log()
.runOn(Schedulers.boundedElastic()).log()
.flatMap((fruit) -> {
if (true) {
throw new RuntimeException("Error.");
}
return Mono.just(fruit).log();
}).log()
.ordered(Comparator.naturalOrder()).log()
.collectList().log()
.block();
assertThat(collected).containsExactlyInAnyOrderElementsOf(iterable);
}
See https://github.com/izeye/samples-java-branches/blob/d87faa814cc7c545a776496bff3eaf8de81fd6f5/src/test/java/learningtest/reactor/core/publisher/FluxTests.java#L261-L278
Possible Solution
None.
Your Environment
- Reactor version(s) used: 3.4.0
- Other relevant libraries versions (eg.
netty, ...): None. - JVM version (
java -version):
➜ samples-java-branches git:(master) ✗ java -version
openjdk version "11.0.7" 2020-04-14
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.7+10)
OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.7+10, mixed mode)
➜ samples-java-branches git:(master)
- OS and version (eg
uname -a):
➜ ~ uname -a
Darwin xxx.local 19.6.0 Darwin Kernel Version 19.6.0: Mon Aug 31 22:12:52 PDT 2020; root:xnu-6153.141.2~1/RELEASE_X86_64 x86_64
➜ ~
Simplified reproducer, fails on 3.3.x as well:
Flux.range(0, 3)
// parallelism must be >= 2
.parallel(2)
.doOnNext(i -> {
throw new RuntimeException("boom");
})
.log("afterFlatMap")
.ordered(Comparator.naturalOrder())
.blockFirst(Duration.ofSeconds(1));
See also https://github.com/reactor/reactor-core/issues/1958
See also #1958
This seems to be a slightly different issue. Probable root cause is the fact that ParallelSourceInner cancels the whole source when an individual rail cancels. Here in @bsideup example the throwing in the doOnNext will cancel that rail, but as it propagate upstream the cancel signal will effectively cancel ALL rails. As a result, there is no more progress and ordered seem to hang.
Note that tracking rail cancellation is possible, but has implications to consider. For instance, if the second rail was to continue emitting in the above example, it would also trigger an error which would be combined into an Exceptions.multiple. This is probably acceptable but needs a deeper review.
Just stumbled over this. This seems like a serious issue with ParallelFlux to me. At least, I would expect that the docs clearly state that you should take special care of exceptions when working with parallel fluxes.