reactor-core icon indicating copy to clipboard operation
reactor-core copied to clipboard

windowTimeout - Flux disposed prematurely

Open kbeineke opened this issue 6 years ago • 1 comments

When using windowTimeout(maxElements, duration) in combination with concat/concatWith with the following preconditions:

  • the number of concatenated elements is greater than maxElements,
  • duration is small and the following operation rather slow.

Then, the following exception is thrown (and not all concatenated elements are processed):

11:09:26.874 [parallel-1] DEBUG reactor.core.publisher.Operators - onNextDropped: UnicastProcessor
11:09:26.879 [parallel-1] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ReactorRejectedExecutionException: Scheduler unavailable
	at reactor.core.Exceptions.failWithRejected(Exceptions.java:249)
	at reactor.core.publisher.Operators.onRejectedExecution(Operators.java:807)
	at reactor.core.publisher.FluxWindowTimeout$WindowTimeoutSubscriber.newPeriod(FluxWindowTimeout.java:188)
	at reactor.core.publisher.FluxWindowTimeout$WindowTimeoutSubscriber.drainLoop(FluxWindowTimeout.java:385)
	at reactor.core.publisher.FluxWindowTimeout$WindowTimeoutSubscriber$ConsumerIndexHolder.run(FluxWindowTimeout.java:440)
	at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
	at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.RejectedExecutionException: Scheduler unavailable
	at reactor.core.Exceptions.<clinit>(Exceptions.java:502)
	at reactor.core.scheduler.Schedulers.workerSchedulePeriodically(Schedulers.java:959)
	at reactor.core.scheduler.ExecutorServiceWorker.schedulePeriodically(ExecutorServiceWorker.java:56)
	at reactor.core.publisher.FluxWindowTimeout$WindowTimeoutSubscriber.newPeriod(FluxWindowTimeout.java:184)
	... 10 common frames omitted

Steps to Reproduce

This seems to be a time-dependent problem. Thus, it does not occur every time. On my machine, it occurred in 9 out of 10 case in this simplified example.

@Test
public void repoCase() {

    final var disposable = Flux
        .just(0)
        .concatWith(Flux.fromStream(IntStream
            .range(1, 1000)
            .boxed()))
        .windowTimeout(100, Duration.ofMillis(1))
        .flatMap(integerFlux -> integerFlux.compose(this::slowOperation))
        .subscribe();

    while (!disposable.isDisposed()) {
        LockSupport.parkNanos(100 * 1000 * 1000);
    }
}

private Flux<Double> slowOperation(final Flux<Integer> flux) {
    return flux.map(integer -> {
        double sum = 0;
        for (int counter = 1; counter < 10000; counter++) {
            sum += Math.pow(-1, counter + 1) / ((2 * counter) - 1);
        }
        return sum;
    });
}

Possible Solution

For now, I use bufferTimeout instead. This is not ideal as buffering is not necessary but it works reliably.

Your Environment

  • Reactor version(s) used: 3.2.11.RELEASE
  • Other relevant libraries versions (eg. netty, ...):
  • JVM version (javar -version): OpenJDK 11.0.1
  • OS and version (eg uname -a): macOS 10.14.6

kbeineke avatar Sep 19 '19 10:09 kbeineke

I am also noticing the same stack trace intermittently when using windowTimeout().

  • JVM version (javar -version): OpenJDK 11.0.1
  • Reactor version: 3.3.0.RELEASE
  • OS: Windows 10

srnagar avatar Apr 06 '20 00:04 srnagar

@kbeineke can you please check if this issue reproduces after the #3054. Thanks

OlegDokuka avatar Dec 06 '22 12:12 OlegDokuka

closing since can not reproduce it with windowTimeout with fair back pressure

OlegDokuka avatar Aug 08 '23 07:08 OlegDokuka