reactor-core
reactor-core copied to clipboard
OverflowException is thrown with bufferTimeout and delayElements operators
We have implemented a rate limiter using bufferTimeout
and delayElements
(see the TestRateLimiter
class in the code below).
Expected Behavior
No exceptions are thrown.
Actual Behavior
The following exception has thrown:
reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:233)
at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.flushCallback(FluxBufferTimeout.java:227)
at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.lambda$new$0(FluxBufferTimeout.java:158)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
According to my tests it is happening when:
- the consumer is slower then the producer
- the producer is slow enough to reach the buffer timeout event in rate limiter instead of reaching the max buffer size
Based on that, I suspect that the problem was caused by multiple threads concurrently pushing the buffer downstream and it may become out of sync. Moving to single threaded processing in rate limiting solved the issue.
Steps to Reproduce
The following unit test can be used to reproduce the issue
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.time.Duration;
import java.util.function.Consumer;
import java.util.stream.IntStream;
class RateLimiterOverflowTest {
private static final Logger LOGGER = LoggerFactory.getLogger(RateLimiterOverflowTest.class);
private static final int MESSAGE_COUNT = 50;
private static final Duration MESSAGE_PRODUCER_DELAY = Duration.ofMillis(5);
private static final Duration RATE_LIMITER_DURATION = Duration.ofMillis(5);
private static final int LIMIT_FOR_DURATION = 5;
private static final Consumer<String> SLOW_CONSUMER = message -> sleep(Duration.ofMillis(50), "consume " + message);
@Test
void shouldLimitRateWithSlowConsumerUsingEnabledRateLimiter() {
// given
var rateLimiter = new TestRateLimiter(LIMIT_FOR_DURATION, RATE_LIMITER_DURATION);
// when
var messageFlux = createLimitedMessage(rateLimiter, SLOW_CONSUMER);
// then
verify(messageFlux);
}
@Test
void shouldLimitRateWithSlowConsumerUsingEnabledRateLimiterWithExtremelyLowDuration() {
// given
var rateLimiter = new TestRateLimiter(LIMIT_FOR_DURATION, Duration.ofNanos(1));
// when
var limitedMessageFlux = createLimitedMessage(rateLimiter, SLOW_CONSUMER);
// then
verify(limitedMessageFlux);
}
private Flux<String> createLimitedMessage(TestRateLimiter rateLimiter, Consumer<String> messageConsumer) {
return createMessageFlux()
.transform(rateLimiter::limitRate)
.doOnError(error -> LOGGER.error(error.getMessage()))
.doOnNext(messageConsumer);
}
private void verify(Flux<String> messageFlux) {
StepVerifier.create(messageFlux)
.expectNextCount(MESSAGE_COUNT)
.verifyComplete();
}
private Flux<String> createMessageFlux() {
return Flux.create(sink -> {
IntStream.range(0, MESSAGE_COUNT)
.mapToObj(index -> "message" + index)
.forEach(message -> {
sleep(MESSAGE_PRODUCER_DELAY, "generate " + message);
sink.next(message);
});
sink.complete();
});
}
private static void sleep(Duration duration, String logMessage) {
try {
LOGGER.info("{} [delay={}ms]", logMessage, duration.toMillis());
Thread.sleep(duration.toMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public class TestRateLimiter {
private Integer limitForDuration;
private Duration duration;
public TestRateLimiter(Integer limitForDuration, Duration duration) {
this.limitForDuration = limitForDuration;
this.duration = duration;
}
public Flux<String> limitRate(Flux<String> flux) {
return flux
.bufferTimeout(limitForDuration, duration)
.delayElements(duration)
.flatMap(Flux::fromIterable);
}
}
}
Possible Solution
As a workaround Schedulers.single()
seems to be resolved the issue on bufferTimeout
and delayElements
operators:
.bufferTimeout(limitForDuration, duration, Schedulers.single())
.delayElements(duration, Schedulers.single())
Your Environment
- Reactor version(s) used: 3.4.17, 3.4.7
- JVM version (
java -version
): openjdk version "11.0.14" 2022-01-18 LTS OpenJDK Runtime Environment Zulu11.54+23-CA (build 11.0.14+9-LTS) OpenJDK 64-Bit Server VM Zulu11.54+23-CA (build 11.0.14+9-LTS, mixed mode) - OS and version (eg
uname -a
): Darwin Kernel Version 21.4.0: Mon Feb 21 20:35:58 PST 2022; root:xnu-8020.101.4~2/RELEASE_ARM64_T6000 arm64
This is unfortunately precisely one limitation of buffer
-with-timeout: the time-related aspect is a hard constraint (I want to emit a buffer every x time period, no matter what) that doesn't play well with slow downstream consumers and backpressure.
If the downstream doesn't make enough request, there's no generic way of pausing time...
I'm not sure there is anything we can do to efficiently fix that limitation, unfortunately 😞
NB: windowTimeout
also suffers from the same limitation, only worse (because in windows the elements must be propagated down as fast and as close to realtime as possible). There is ongoing work on windowTimeout to try and improve the situation, but it has already taken months of effort...
Alternate solution
I think this is the solution executing downstream on different thread/threads.
In my view this won't be impact on performance as well
public Flux<String> limitRate(Flux<String> flux) { return flux .bufferTimeout(limitForDuration, duration) .delayElements(duration) .publishOn(Schedulers.newParallel("testing",5)) // .publishOn(Schedulers.single()) .flatMap(Flux::fromIterable); }
@cision-batag we implemented the required mechanism in windowTimeout, thus now you can replace your bufferTimeout with an equal code with windowTimeout
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.time.Duration;
import java.util.function.Consumer;
import java.util.stream.IntStream;
class RateLimiterOverflowTest {
private static final Logger LOGGER = LoggerFactory.getLogger(RateLimiterOverflowTest.class);
private static final int MESSAGE_COUNT = 50;
private static final Duration MESSAGE_PRODUCER_DELAY = Duration.ofMillis(5);
private static final Duration RATE_LIMITER_DURATION = Duration.ofMillis(5);
private static final int LIMIT_FOR_DURATION = 5;
private static final Consumer<String> SLOW_CONSUMER = message -> sleep(Duration.ofMillis(50), "consume " + message);
@Test
void shouldLimitRateWithSlowConsumerUsingEnabledRateLimiter() {
// given
var rateLimiter = new TestRateLimiter(LIMIT_FOR_DURATION, RATE_LIMITER_DURATION);
// when
var messageFlux = createLimitedMessage(rateLimiter, SLOW_CONSUMER);
// then
verify(messageFlux);
}
@Test
void shouldLimitRateWithSlowConsumerUsingEnabledRateLimiterWithExtremelyLowDuration() {
// given
var rateLimiter = new TestRateLimiter(LIMIT_FOR_DURATION, Duration.ofNanos(1));
// when
var limitedMessageFlux = createLimitedMessage(rateLimiter, SLOW_CONSUMER);
// then
verify(limitedMessageFlux);
}
private Flux<String> createLimitedMessage(TestRateLimiter rateLimiter, Consumer<String> messageConsumer) {
return createMessageFlux()
.transform(rateLimiter::limitRate)
.doOnError(error -> LOGGER.error(error.getMessage()))
.doOnNext(messageConsumer);
}
private void verify(Flux<String> messageFlux) {
StepVerifier.create(messageFlux)
.expectNextCount(MESSAGE_COUNT)
.verifyComplete();
}
private Flux<String> createMessageFlux() {
return Flux.create(sink -> {
IntStream.range(0, MESSAGE_COUNT)
.mapToObj(index -> "message" + index)
.forEach(message -> {
sleep(MESSAGE_PRODUCER_DELAY, "generate " + message);
sink.next(message);
});
sink.complete();
});
}
private static void sleep(Duration duration, String logMessage) {
try {
LOGGER.info("{} [delay={}ms]", logMessage, duration.toMillis());
Thread.sleep(duration.toMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public class TestRateLimiter {
private Integer limitForDuration;
private Duration duration;
public TestRateLimiter(Integer limitForDuration, Duration duration) {
this.limitForDuration = limitForDuration;
this.duration = duration;
}
public Flux<String> limitRate(Flux<String> flux) {
return flux
.windowTimeout(limitForDuration, duration)
.concatMap(f -> f.collectList().delayElement(duration), 0)
.flatMap(Flux::fromIterable);
}
}
}
@OlegDokuka Is there a solution to this? Or any chance of getting a list out of the windowTimeout()
?
@kfoekao windowTimeout + concatmap(f.collectList, 0) should be a replacement
@OlegDokuka What does the prefetch value of 0 represent? Can you explain how does this accomplishes the buffer timeout behavior?
Also, when I tried this method, I've got an error of reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
.
Do you have an idea for why would this happen?
@OlegDokuka What does the prefetch value of 0 represent? Can you explain how does this accomplishes the buffer timeout behavior?
0 means concat map does not prefetch extra elements, so it is exactly 1 window at a time, thus only 1 window is collecting values into a list which is propagated later on into downstream
Also, when I tried this method, I've got an error of
reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
. Do you have an idea for why would this happen?
Feel free to create an issues if you have a reproducer
@kfoekao windowTimeout + concatmap(f.collectList, 0) should be a replacement
Just a heads up - the workaround replacement is much slower. For my use case windowUntilChanged
+ concatMap
was roughly 5-6 times slower than bufferUntilChanged
(in the cases when it did not crash obviously..).
@janvojt
roughly 5-6 times slower than bufferUntilChanged (in the cases when it did not crash obviously..).
that is expected, you have more overhead with the new guarantees. Also, just a heads-up @chemicL is working on the port of windowTimeout functionality to the bufferTimeout, so it could be faster since there is no extra coordination as in the case of window which represents events as a flux
Is the bufferTimeout backpressure support will be released soon? I read that there is workaround with the windowTimeout but I think its should also implemented in the bufferTimeout (I'm using it for batching for databases access)
@bensilvan we are working on it!
Encountering this, adding my self here :)
Encountering the same issue and tried to use the proposed windowTimeout alternative. Got the same error as @almogtavor
2023-04-08 23:31:37.832 ERROR --- [-367412941877-1] reactor.core.publisher.Operators : Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
Caused by: reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:236)
at reactor.core.publisher.FluxWindowTimeout$WindowTimeoutSubscriber.onNext(FluxWindowTimeout.java:1901)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:567)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:652)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.onNext(FluxGroupBy.java:692)
at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:211)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:113)
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:345)
at reactor.core.publisher.FluxUsing$UsingFuseableSubscriber.onNext(FluxUsing.java:353)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:453)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:724)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:256)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runBackfused(FluxPublishOn.java:484)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:521)
at reactor.core.scheduler.ExecutorScheduler$ExecutorTrackedRunnable.run(ExecutorScheduler.java:192)
at reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:84)
at reactor.core.scheduler.SingleWorkerScheduler.execute(SingleWorkerScheduler.java:64)
at reactor.core.scheduler.ExecutorScheduler$ExecutorSchedulerWorker.schedule(ExecutorScheduler.java:252)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.trySchedule(FluxPublishOn.java:312)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:237)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.SinkManyUnicast.drainRegular(SinkManyUnicast.java:282)
at reactor.core.publisher.SinkManyUnicast.drain(SinkManyUnicast.java:364)
at reactor.core.publisher.SinkManyUnicast.tryEmitNext(SinkManyUnicast.java:237)
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
at reactor.kafka.receiver.internals.ConsumerEventLoop$PollEvent.run(ConsumerEventLoop.java:371)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Messages are emitted by a kafka-reactor source.
I also get this:
... ERROR reactor.core.publisher.Operators Operator called default onErrorDropped
reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:237)
at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.flushCallback(FluxBufferTimeout.java:227)
at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.lambda$new$0(FluxBufferTimeout.java:158)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.lang.Thread.run(Thread.java:829)
with reactor 3.5.4 when code is like:
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Disposable sinkSubscription = sink.asFlux()
.bufferTimeout(10, ofMillis(1000))
.publishOn(Schedulers.newSingle("..."))
.flatMap(this::doSomething)
.onErrorContinue((e, o) -> log.error(e))
.subscribe();
I see few differences from originally posted issue:
- I use
sink
- no
delayElements
in my case -
publishOn
another thread
+1 to solving this
This issue should be resolved by #3332. Please test and open a new issue in case of problems.