reactor-core
reactor-core copied to clipboard
Race condition in .bufferUntil causing "Could not emit buffer due to lack of requests"
bufferUntil
throws an OverflowException: Could not emit buffer due to lack of requests
Most likely it is related/same to issue #1937 but this time the upstream is multi-threaded as well.
Expected Behavior
Actual Behavior
Exception in thread "main" reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:233)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNextNewBuffer(FluxBufferPredicate.java:321)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.tryOnNext(FluxBufferPredicate.java:227)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNext(FluxBufferPredicate.java:200)
Steps to Reproduce
@Test
public void bufferUntilRaceCondition() {
Mono<Void> source = Flux.range(0, 1000)
.flatMap(v -> Flux.range(0, 1000).subscribeOn(Schedulers.parallel()))
.bufferUntil(v -> true)
.flatMap(v -> Mono.delay(Duration.ofMillis(1)))
.then();
StepVerifier.create(source).verifyComplete();
}
Possible Solution
Your Environment
- Reactor version(s) used: 3.4.15
- JVM version (
java -version
): JDK 11 - OS and version (eg
uname -a
): Ubuntu 18.04 (Bionic)
~~Note that this did not happen in 3.4.14.~~ ... note that I just was unlucky reproducing it with 3.4.14 at first...
~Note that this did not happen in 3.4.14.~ ... note that I just was unlucky reproducing it with 3.4.14 at first...
Thank you, to have a look into it!
Just to make sure, the code snippet is a very artificial one.
To bring it closer to my actual use-case, below a slightly modified version, which still produce the OverflowException: Could not emit buffer due to lack of requests
, on my system.
Reactor version(s) used: 3.4.19 JVM version (java -version): JDK 11
@Test
public void bufferUntilRaceCondition() {
int chunk = 1000;
Mono<Void> source = Flux.range(0, 1000)
.flatMapSequential(v -> Flux.range(v * chunk, chunk).subscribeOn(Schedulers.parallel()))
.bufferUntil(v -> v % 100 != 0)
.flatMap(v -> Mono.delay(Duration.ofMillis(1)))
.then();
StepVerifier.create(source).verifyComplete();
}
@rtroilo there is a lack of backpressure communication in the current bufferUntil impl so the observed issue is expected. I have created a separate #3106 so this going to be fixed as a part of that effort. Stay tuned
Closing as superseded by #3106