windowTimeout - Flux disposed prematurely
When using windowTimeout(maxElements, duration) in combination with concat/concatWith with the following preconditions:
- the number of concatenated elements is greater than
maxElements, durationis small and the following operation rather slow.
Then, the following exception is thrown (and not all concatenated elements are processed):
11:09:26.874 [parallel-1] DEBUG reactor.core.publisher.Operators - onNextDropped: UnicastProcessor
11:09:26.879 [parallel-1] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ReactorRejectedExecutionException: Scheduler unavailable
at reactor.core.Exceptions.failWithRejected(Exceptions.java:249)
at reactor.core.publisher.Operators.onRejectedExecution(Operators.java:807)
at reactor.core.publisher.FluxWindowTimeout$WindowTimeoutSubscriber.newPeriod(FluxWindowTimeout.java:188)
at reactor.core.publisher.FluxWindowTimeout$WindowTimeoutSubscriber.drainLoop(FluxWindowTimeout.java:385)
at reactor.core.publisher.FluxWindowTimeout$WindowTimeoutSubscriber$ConsumerIndexHolder.run(FluxWindowTimeout.java:440)
at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.RejectedExecutionException: Scheduler unavailable
at reactor.core.Exceptions.<clinit>(Exceptions.java:502)
at reactor.core.scheduler.Schedulers.workerSchedulePeriodically(Schedulers.java:959)
at reactor.core.scheduler.ExecutorServiceWorker.schedulePeriodically(ExecutorServiceWorker.java:56)
at reactor.core.publisher.FluxWindowTimeout$WindowTimeoutSubscriber.newPeriod(FluxWindowTimeout.java:184)
... 10 common frames omitted
Steps to Reproduce
This seems to be a time-dependent problem. Thus, it does not occur every time. On my machine, it occurred in 9 out of 10 case in this simplified example.
@Test
public void repoCase() {
final var disposable = Flux
.just(0)
.concatWith(Flux.fromStream(IntStream
.range(1, 1000)
.boxed()))
.windowTimeout(100, Duration.ofMillis(1))
.flatMap(integerFlux -> integerFlux.compose(this::slowOperation))
.subscribe();
while (!disposable.isDisposed()) {
LockSupport.parkNanos(100 * 1000 * 1000);
}
}
private Flux<Double> slowOperation(final Flux<Integer> flux) {
return flux.map(integer -> {
double sum = 0;
for (int counter = 1; counter < 10000; counter++) {
sum += Math.pow(-1, counter + 1) / ((2 * counter) - 1);
}
return sum;
});
}
Possible Solution
For now, I use bufferTimeout instead. This is not ideal as buffering is not necessary but it works reliably.
Your Environment
- Reactor version(s) used: 3.2.11.RELEASE
- Other relevant libraries versions (eg.
netty, ...): - JVM version (
javar -version): OpenJDK 11.0.1 - OS and version (eg
uname -a): macOS 10.14.6
I am also noticing the same stack trace intermittently when using windowTimeout().
- JVM version (javar -version): OpenJDK 11.0.1
- Reactor version: 3.3.0.RELEASE
- OS: Windows 10
@kbeineke can you please check if this issue reproduces after the #3054. Thanks
closing since can not reproduce it with windowTimeout with fair back pressure