reactor-core icon indicating copy to clipboard operation
reactor-core copied to clipboard

OverflowException is thrown with bufferTimeout and delayElements operators

Open cision-batag opened this issue 2 years ago • 7 comments

We have implemented a rate limiter using bufferTimeout and delayElements (see the TestRateLimiter class in the code below).

Expected Behavior

No exceptions are thrown.

Actual Behavior

The following exception has thrown:

reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
	at reactor.core.Exceptions.failWithOverflow(Exceptions.java:233)
	at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.flushCallback(FluxBufferTimeout.java:227)
	at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.lambda$new$0(FluxBufferTimeout.java:158)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

According to my tests it is happening when:

  • the consumer is slower then the producer
  • the producer is slow enough to reach the buffer timeout event in rate limiter instead of reaching the max buffer size

Based on that, I suspect that the problem was caused by multiple threads concurrently pushing the buffer downstream and it may become out of sync. Moving to single threaded processing in rate limiting solved the issue.

Steps to Reproduce

The following unit test can be used to reproduce the issue

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

import java.time.Duration;
import java.util.function.Consumer;
import java.util.stream.IntStream;

class RateLimiterOverflowTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(RateLimiterOverflowTest.class);
    private static final int MESSAGE_COUNT = 50;
    private static final Duration MESSAGE_PRODUCER_DELAY = Duration.ofMillis(5);
    private static final Duration RATE_LIMITER_DURATION = Duration.ofMillis(5);
    private static final int LIMIT_FOR_DURATION = 5;
    private static final Consumer<String> SLOW_CONSUMER = message -> sleep(Duration.ofMillis(50), "consume " + message);

    @Test
    void shouldLimitRateWithSlowConsumerUsingEnabledRateLimiter() {
        // given
        var rateLimiter = new TestRateLimiter(LIMIT_FOR_DURATION, RATE_LIMITER_DURATION);

        // when
        var messageFlux = createLimitedMessage(rateLimiter, SLOW_CONSUMER);

        // then
        verify(messageFlux);
    }

    @Test
    void shouldLimitRateWithSlowConsumerUsingEnabledRateLimiterWithExtremelyLowDuration() {
        // given
        var rateLimiter = new TestRateLimiter(LIMIT_FOR_DURATION, Duration.ofNanos(1));

        // when
        var limitedMessageFlux = createLimitedMessage(rateLimiter, SLOW_CONSUMER);

        // then
        verify(limitedMessageFlux);
    }

    private Flux<String> createLimitedMessage(TestRateLimiter rateLimiter, Consumer<String> messageConsumer) {
        return createMessageFlux()
                .transform(rateLimiter::limitRate)
                .doOnError(error -> LOGGER.error(error.getMessage()))
                .doOnNext(messageConsumer);
    }

    private void verify(Flux<String> messageFlux) {
        StepVerifier.create(messageFlux)
                .expectNextCount(MESSAGE_COUNT)
                .verifyComplete();
    }

    private Flux<String> createMessageFlux() {
        return Flux.create(sink -> {
            IntStream.range(0, MESSAGE_COUNT)
                    .mapToObj(index -> "message" + index)
                    .forEach(message -> {
                        sleep(MESSAGE_PRODUCER_DELAY, "generate " + message);
                        sink.next(message);
                    });
            sink.complete();
        });
    }

    private static void sleep(Duration duration, String logMessage) {
        try {
            LOGGER.info("{} [delay={}ms]", logMessage, duration.toMillis());
            Thread.sleep(duration.toMillis());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public class TestRateLimiter {
        private Integer limitForDuration;
        private Duration duration;

        public TestRateLimiter(Integer limitForDuration, Duration duration) {
            this.limitForDuration = limitForDuration;
            this.duration = duration;
        }

        public Flux<String> limitRate(Flux<String> flux) {
            return flux
                    .bufferTimeout(limitForDuration, duration)
                    .delayElements(duration)
                    .flatMap(Flux::fromIterable);
        }
    }
}

Possible Solution

As a workaround Schedulers.single() seems to be resolved the issue on bufferTimeout and delayElements operators:

.bufferTimeout(limitForDuration, duration, Schedulers.single())
.delayElements(duration, Schedulers.single())

Your Environment

  • Reactor version(s) used: 3.4.17, 3.4.7
  • JVM version (java -version): openjdk version "11.0.14" 2022-01-18 LTS OpenJDK Runtime Environment Zulu11.54+23-CA (build 11.0.14+9-LTS) OpenJDK 64-Bit Server VM Zulu11.54+23-CA (build 11.0.14+9-LTS, mixed mode)
  • OS and version (eg uname -a): Darwin Kernel Version 21.4.0: Mon Feb 21 20:35:58 PST 2022; root:xnu-8020.101.4~2/RELEASE_ARM64_T6000 arm64

cision-batag avatar Apr 12 '22 15:04 cision-batag

This is unfortunately precisely one limitation of buffer-with-timeout: the time-related aspect is a hard constraint (I want to emit a buffer every x time period, no matter what) that doesn't play well with slow downstream consumers and backpressure. If the downstream doesn't make enough request, there's no generic way of pausing time...

I'm not sure there is anything we can do to efficiently fix that limitation, unfortunately 😞

NB: windowTimeout also suffers from the same limitation, only worse (because in windows the elements must be propagated down as fast and as close to realtime as possible). There is ongoing work on windowTimeout to try and improve the situation, but it has already taken months of effort...

simonbasle avatar May 23 '22 15:05 simonbasle

Alternate solution
I think this is the solution executing downstream on different thread/threads. In my view this won't be impact on performance as well

public Flux<String> limitRate(Flux<String> flux) { return flux .bufferTimeout(limitForDuration, duration) .delayElements(duration) .publishOn(Schedulers.newParallel("testing",5)) // .publishOn(Schedulers.single()) .flatMap(Flux::fromIterable); }

srihariprasadCision avatar May 27 '22 01:05 srihariprasadCision

@cision-batag we implemented the required mechanism in windowTimeout, thus now you can replace your bufferTimeout with an equal code with windowTimeout

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

import java.time.Duration;
import java.util.function.Consumer;
import java.util.stream.IntStream;

class RateLimiterOverflowTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(RateLimiterOverflowTest.class);
    private static final int MESSAGE_COUNT = 50;
    private static final Duration MESSAGE_PRODUCER_DELAY = Duration.ofMillis(5);
    private static final Duration RATE_LIMITER_DURATION = Duration.ofMillis(5);
    private static final int LIMIT_FOR_DURATION = 5;
    private static final Consumer<String> SLOW_CONSUMER = message -> sleep(Duration.ofMillis(50), "consume " + message);

    @Test
    void shouldLimitRateWithSlowConsumerUsingEnabledRateLimiter() {
        // given
        var rateLimiter = new TestRateLimiter(LIMIT_FOR_DURATION, RATE_LIMITER_DURATION);

        // when
        var messageFlux = createLimitedMessage(rateLimiter, SLOW_CONSUMER);

        // then
        verify(messageFlux);
    }

    @Test
    void shouldLimitRateWithSlowConsumerUsingEnabledRateLimiterWithExtremelyLowDuration() {
        // given
        var rateLimiter = new TestRateLimiter(LIMIT_FOR_DURATION, Duration.ofNanos(1));

        // when
        var limitedMessageFlux = createLimitedMessage(rateLimiter, SLOW_CONSUMER);

        // then
        verify(limitedMessageFlux);
    }

    private Flux<String> createLimitedMessage(TestRateLimiter rateLimiter, Consumer<String> messageConsumer) {
        return createMessageFlux()
                .transform(rateLimiter::limitRate)
                .doOnError(error -> LOGGER.error(error.getMessage()))
                .doOnNext(messageConsumer);
    }

    private void verify(Flux<String> messageFlux) {
        StepVerifier.create(messageFlux)
                .expectNextCount(MESSAGE_COUNT)
                .verifyComplete();
    }

    private Flux<String> createMessageFlux() {
        return Flux.create(sink -> {
            IntStream.range(0, MESSAGE_COUNT)
                    .mapToObj(index -> "message" + index)
                    .forEach(message -> {
                        sleep(MESSAGE_PRODUCER_DELAY, "generate " + message);
                        sink.next(message);
                    });
            sink.complete();
        });
    }

    private static void sleep(Duration duration, String logMessage) {
        try {
            LOGGER.info("{} [delay={}ms]", logMessage, duration.toMillis());
            Thread.sleep(duration.toMillis());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public class TestRateLimiter {
        private Integer limitForDuration;
        private Duration duration;

        public TestRateLimiter(Integer limitForDuration, Duration duration) {
            this.limitForDuration = limitForDuration;
            this.duration = duration;
        }

        public Flux<String> limitRate(Flux<String> flux) {
            return flux
                    .windowTimeout(limitForDuration, duration)
                    .concatMap(f -> f.collectList().delayElement(duration), 0)
                    .flatMap(Flux::fromIterable);
        }
    }
}

OlegDokuka avatar Jun 13 '22 07:06 OlegDokuka

@OlegDokuka Is there a solution to this? Or any chance of getting a list out of the windowTimeout()?

kfoekao avatar Aug 15 '22 11:08 kfoekao

@kfoekao windowTimeout + concatmap(f.collectList, 0) should be a replacement

OlegDokuka avatar Aug 15 '22 11:08 OlegDokuka

@OlegDokuka What does the prefetch value of 0 represent? Can you explain how does this accomplishes the buffer timeout behavior? Also, when I tried this method, I've got an error of reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...). Do you have an idea for why would this happen?

almogtavor avatar Aug 21 '22 22:08 almogtavor

@OlegDokuka What does the prefetch value of 0 represent? Can you explain how does this accomplishes the buffer timeout behavior?

0 means concat map does not prefetch extra elements, so it is exactly 1 window at a time, thus only 1 window is collecting values into a list which is propagated later on into downstream

Also, when I tried this method, I've got an error of reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...). Do you have an idea for why would this happen?

Feel free to create an issues if you have a reproducer

OlegDokuka avatar Aug 22 '22 05:08 OlegDokuka

@kfoekao windowTimeout + concatmap(f.collectList, 0) should be a replacement

Just a heads up - the workaround replacement is much slower. For my use case windowUntilChanged + concatMap was roughly 5-6 times slower than bufferUntilChanged (in the cases when it did not crash obviously..).

janvojt avatar Nov 23 '22 10:11 janvojt

@janvojt

roughly 5-6 times slower than bufferUntilChanged (in the cases when it did not crash obviously..).

that is expected, you have more overhead with the new guarantees. Also, just a heads-up @chemicL is working on the port of windowTimeout functionality to the bufferTimeout, so it could be faster since there is no extra coordination as in the case of window which represents events as a flux

OlegDokuka avatar Nov 24 '22 15:11 OlegDokuka

Is the bufferTimeout backpressure support will be released soon? I read that there is workaround with the windowTimeout but I think its should also implemented in the bufferTimeout (I'm using it for batching for databases access)

bensilvan avatar Dec 17 '22 03:12 bensilvan

@bensilvan we are working on it!

OlegDokuka avatar Dec 23 '22 18:12 OlegDokuka

Encountering this, adding my self here :)

patpatpat123 avatar Mar 24 '23 03:03 patpatpat123

Encountering the same issue and tried to use the proposed windowTimeout alternative. Got the same error as @almogtavor

2023-04-08 23:31:37.832 ERROR   --- [-367412941877-1] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
Caused by: reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
	at reactor.core.Exceptions.failWithOverflow(Exceptions.java:236)
	at reactor.core.publisher.FluxWindowTimeout$WindowTimeoutSubscriber.onNext(FluxWindowTimeout.java:1901)
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:567)
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:652)
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.onNext(FluxGroupBy.java:692)
	at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:211)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
	at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:113)
	at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:345)
	at reactor.core.publisher.FluxUsing$UsingFuseableSubscriber.onNext(FluxUsing.java:353)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
	at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:453)
	at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:724)
	at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:256)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runBackfused(FluxPublishOn.java:484)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:521)
	at reactor.core.scheduler.ExecutorScheduler$ExecutorTrackedRunnable.run(ExecutorScheduler.java:192)
	at reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:84)
	at reactor.core.scheduler.SingleWorkerScheduler.execute(SingleWorkerScheduler.java:64)
	at reactor.core.scheduler.ExecutorScheduler$ExecutorSchedulerWorker.schedule(ExecutorScheduler.java:252)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.trySchedule(FluxPublishOn.java:312)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:237)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
	at reactor.core.publisher.SinkManyUnicast.drainRegular(SinkManyUnicast.java:282)
	at reactor.core.publisher.SinkManyUnicast.drain(SinkManyUnicast.java:364)
	at reactor.core.publisher.SinkManyUnicast.tryEmitNext(SinkManyUnicast.java:237)
	at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
	at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
	at reactor.kafka.receiver.internals.ConsumerEventLoop$PollEvent.run(ConsumerEventLoop.java:371)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Messages are emitted by a kafka-reactor source.

Sebruck avatar Apr 08 '23 21:04 Sebruck

I also get this:

... ERROR reactor.core.publisher.Operators Operator called default onErrorDropped
reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:237)
    at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.flushCallback(FluxBufferTimeout.java:227)
    at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.lambda$new$0(FluxBufferTimeout.java:158)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
    at java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.lang.Thread.run(Thread.java:829)

with reactor 3.5.4 when code is like:

Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Disposable sinkSubscription = sink.asFlux()
  .bufferTimeout(10, ofMillis(1000))
  .publishOn(Schedulers.newSingle("..."))
  .flatMap(this::doSomething)
  .onErrorContinue((e, o) -> log.error(e))
  .subscribe();

I see few differences from originally posted issue:

  • I use sink
  • no delayElements in my case
  • publishOn another thread

+1 to solving this

vsadokhin avatar Apr 13 '23 07:04 vsadokhin

This issue should be resolved by #3332. Please test and open a new issue in case of problems.

chemicL avatar Jun 20 '23 10:06 chemicL