reactor-core icon indicating copy to clipboard operation
reactor-core copied to clipboard

Race condition in .bufferUntil causing "Could not emit buffer due to lack of requests"

Open rtroilo opened this issue 2 years ago • 3 comments

bufferUntil throws an OverflowException: Could not emit buffer due to lack of requests Most likely it is related/same to issue #1937 but this time the upstream is multi-threaded as well.

Expected Behavior

Actual Behavior

Exception in thread "main" reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
	at reactor.core.Exceptions.failWithOverflow(Exceptions.java:233)
	at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNextNewBuffer(FluxBufferPredicate.java:321)
	at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.tryOnNext(FluxBufferPredicate.java:227)
	at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNext(FluxBufferPredicate.java:200)

Steps to Reproduce

  @Test
  public void bufferUntilRaceCondition() {
    Mono<Void> source = Flux.range(0, 1000)
        .flatMap(v -> Flux.range(0, 1000).subscribeOn(Schedulers.parallel()))
        .bufferUntil(v -> true)
        .flatMap(v -> Mono.delay(Duration.ofMillis(1)))
        .then();

    StepVerifier.create(source).verifyComplete();
  }

Possible Solution

Your Environment

  • Reactor version(s) used: 3.4.15
  • JVM version (java -version): JDK 11
  • OS and version (eg uname -a): Ubuntu 18.04 (Bionic)

rtroilo avatar Feb 28 '22 18:02 rtroilo

~~Note that this did not happen in 3.4.14.~~ ... note that I just was unlucky reproducing it with 3.4.14 at first...

yamass avatar Jul 04 '22 14:07 yamass

~Note that this did not happen in 3.4.14.~ ... note that I just was unlucky reproducing it with 3.4.14 at first...

Thank you, to have a look into it!

Just to make sure, the code snippet is a very artificial one. To bring it closer to my actual use-case, below a slightly modified version, which still produce the OverflowException: Could not emit buffer due to lack of requests, on my system.

Reactor version(s) used: 3.4.19 JVM version (java -version): JDK 11

@Test
public void bufferUntilRaceCondition() {
  int chunk = 1000;

  Mono<Void> source = Flux.range(0, 1000)
      .flatMapSequential(v -> Flux.range(v * chunk, chunk).subscribeOn(Schedulers.parallel()))
      .bufferUntil(v -> v % 100 != 0)
      .flatMap(v -> Mono.delay(Duration.ofMillis(1)))
      .then();

  StepVerifier.create(source).verifyComplete();
}

rtroilo avatar Jul 05 '22 09:07 rtroilo

@rtroilo there is a lack of backpressure communication in the current bufferUntil impl so the observed issue is expected. I have created a separate #3106 so this going to be fixed as a part of that effort. Stay tuned

OlegDokuka avatar Jul 05 '22 09:07 OlegDokuka

Closing as superseded by #3106

chemicL avatar Mar 18 '24 13:03 chemicL