investigate flaky `FluxRefCountGraceTest > raceSubscribeAndCancelNoTimeout` raceTest
FluxRefCountGraceTest > raceSubscribeAndCancelNoTimeout() FAILED
java.lang.AssertionError: [signalCount2]
Expecting AtomicInteger(99999) to have value:
100000
but did not.
at reactor.core.publisher.FluxRefCountGraceTest.raceSubscribeAndCancelNoTimeout(FluxRefCountGraceTest.java:299)
Interestingly, the combination of replay with refCount might be problematic as pointed out in one summary by Simon. This reflection came a few years after the test was introduced to address #1260
For reference, the output when the test fails:
14:14:32.349 [boundedElastic-1039] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disconnected
Caused by: java.util.concurrent.CancellationException: Disconnected
at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1390)
at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:133)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:314)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:255)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:877)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:965)
at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1395)
at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:133)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:314)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:255)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:877)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:965)
at reactor.core.publisher.FluxReplay$ReplaySubscriber.onNext(FluxReplay.java:1344)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:878)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:803)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:161)
at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$25(FluxRefCountGraceTest.java:277)
at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:95)
at reactor.core.publisher.FluxReplay.connect(FluxReplay.java:1141)
at reactor.core.publisher.FluxRefCountGrace.subscribe(FluxRefCountGrace.java:100)
at reactor.core.publisher.Mono.subscribe(Mono.java:4490)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4605)
at reactor.core.publisher.Mono.subscribe(Mono.java:4318)
at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$27(FluxRefCountGraceTest.java:287)
at reactor.test.util.RaceTestUtils.lambda$race$2(RaceTestUtils.java:168)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
14:14:32.349 [boundedElastic-1040] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disconnected
Caused by: java.util.concurrent.CancellationException: Disconnected
at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1390)
at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:133)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:314)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:255)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:877)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:965)
at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1395)
at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:133)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:314)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:255)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:877)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:965)
at reactor.core.publisher.FluxReplay$ReplaySubscriber.onNext(FluxReplay.java:1344)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:878)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:803)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:161)
at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$25(FluxRefCountGraceTest.java:277)
at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:95)
at reactor.core.publisher.FluxReplay.connect(FluxReplay.java:1141)
at reactor.core.publisher.FluxRefCountGrace.subscribe(FluxRefCountGrace.java:100)
at reactor.core.publisher.Mono.subscribe(Mono.java:4490)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4605)
at reactor.core.publisher.Mono.subscribe(Mono.java:4318)
at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$27(FluxRefCountGraceTest.java:287)
at reactor.test.util.RaceTestUtils.lambda$race$2(RaceTestUtils.java:168)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
java.lang.AssertionError: [signalCount2]
Expecting AtomicInteger(99998) to have value:
100000
but did not.
at reactor.core.publisher.FluxRefCountGraceTest.raceSubscribeAndCancelNoTimeout(FluxRefCountGraceTest.java:299)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
It feels like at some racy case both Subscribers of FluxRefCountGrace get the same error drop caused by cancellation. What is unexpected though is that both trigger FluxReplay.connect, while the assumption is that only one subscriber makes that call. Perhaps the race is that while the first subscriber is already canceling and the FluxReplay subscription is being disposed, the next subscriber comes, notices that the connection reports it's disposed and initiates a new connection and that is causing issues.
For debugging locally, I added
new RuntimeException("onError called!").printStackTrace();
to FluxRefCountGrace$RefCountInner.onError and got this during a failing run:
java.lang.RuntimeException: onError called!
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onError(FluxRefCountGrace.java:270)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:869)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1405)
at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1405)
at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
at reactor.core.publisher.FluxReplay$ReplaySubscriber.onNext(FluxReplay.java:1353)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:880)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:805)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:163)
at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$25(FluxRefCountGraceTest.java:277)
at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:97)
at reactor.core.publisher.FluxReplay.connect(FluxReplay.java:1149)
at reactor.core.publisher.FluxRefCountGrace.subscribe(FluxRefCountGrace.java:102)
at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578)
at reactor.core.publisher.Mono.subscribe(Mono.java:4339)
at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$29(FluxRefCountGraceTest.java:293)
at reactor.test.util.RaceTestUtils.lambda$race$2(RaceTestUtils.java:180)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
java.lang.RuntimeException: onError called!
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onError(FluxRefCountGrace.java:270)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:869)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
at reactor.core.publisher.FluxReplay$ReplayInner.request(FluxReplay.java:1721)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.request(FluxRefCountGrace.java:311)
at reactor.core.publisher.MonoNext$NextSubscriber.request(MonoNext.java:108)
at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138)
at reactor.core.publisher.LambdaMonoSubscriber.onSubscribe(LambdaMonoSubscriber.java:121)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:171)
at reactor.core.publisher.MonoNext$NextSubscriber.onSubscribe(MonoNext.java:70)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.setRefConnection(FluxRefCountGrace.java:236)
at reactor.core.publisher.FluxRefCountGrace.subscribe(FluxRefCountGrace.java:98)
at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578)
at reactor.core.publisher.Mono.subscribe(Mono.java:4339)
at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$27(FluxRefCountGraceTest.java:287)
at reactor.test.util.RaceTestUtils.lambda$race$2(RaceTestUtils.java:180)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
09:12:32.364 [boundedElastic-71] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disconnected
Caused by: java.util.concurrent.CancellationException: Disconnected
at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1399)
at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1405)
at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
at reactor.core.publisher.FluxReplay$ReplaySubscriber.onNext(FluxReplay.java:1353)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:880)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:805)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:163)
at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$25(FluxRefCountGraceTest.java:277)
at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:97)
at reactor.core.publisher.FluxReplay.connect(FluxReplay.java:1149)
at reactor.core.publisher.FluxRefCountGrace.subscribe(FluxRefCountGrace.java:102)
at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578)
at reactor.core.publisher.Mono.subscribe(Mono.java:4339)
at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$29(FluxRefCountGraceTest.java:293)
at reactor.test.util.RaceTestUtils.lambda$race$2(RaceTestUtils.java:180)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
java.lang.RuntimeException: onError called!
09:12:32.364 [boundedElastic-72] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disconnected
Caused by: java.util.concurrent.CancellationException: Disconnected
at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1399)
at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1405)
at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
at reactor.core.publisher.FluxReplay$ReplaySubscriber.onNext(FluxReplay.java:1353)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:880)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:805)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:163)
at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$25(FluxRefCountGraceTest.java:277)
at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:97)
at reactor.core.publisher.FluxReplay.connect(FluxReplay.java:1149)
at reactor.core.publisher.FluxRefCountGrace.subscribe(FluxRefCountGrace.java:102)
at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578)
at reactor.core.publisher.Mono.subscribe(Mono.java:4339)
at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$29(FluxRefCountGraceTest.java:293)
at reactor.test.util.RaceTestUtils.lambda$race$2(RaceTestUtils.java:180)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onError(FluxRefCountGrace.java:270)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:869)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1405)
at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1405)
at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
at reactor.core.publisher.FluxReplay$ReplaySubscriber.onNext(FluxReplay.java:1353)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:880)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:805)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:163)
at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$25(FluxRefCountGraceTest.java:277)
at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:97)
at reactor.core.publisher.FluxReplay.connect(FluxReplay.java:1149)
at reactor.core.publisher.FluxRefCountGrace.subscribe(FluxRefCountGrace.java:102)
at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578)
at reactor.core.publisher.Mono.subscribe(Mono.java:4339)
at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$29(FluxRefCountGraceTest.java:293)
at reactor.test.util.RaceTestUtils.lambda$race$2(RaceTestUtils.java:180)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
09:12:32.366 [boundedElastic-72] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disconnected
Caused by: java.util.concurrent.CancellationException: Disconnected
at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1399)
at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1405)
at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
at reactor.core.publisher.FluxReplay$ReplaySubscriber.onNext(FluxReplay.java:1353)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:880)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:805)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:163)
at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$25(FluxRefCountGraceTest.java:277)
at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:97)
at reactor.core.publisher.FluxReplay.connect(FluxReplay.java:1149)
at reactor.core.publisher.FluxRefCountGrace.subscribe(FluxRefCountGrace.java:102)
at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578)
at reactor.core.publisher.Mono.subscribe(Mono.java:4339)
at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$29(FluxRefCountGraceTest.java:293)
at reactor.test.util.RaceTestUtils.lambda$race$2(RaceTestUtils.java:180)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
java.lang.AssertionError: [signalCount1]
Expecting AtomicInteger(99997) to have value:
100000
but did not.
at reactor.core.publisher.FluxRefCountGraceTest.raceSubscribeAndCancelNoTimeout(FluxRefCountGraceTest.java:298)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
I just wanted to chip in.
When I was doing my changes to the replay-buffer I found that there was a small detail in there that, when not implemented properly in the ReplayBuffer, these tests (this and similar ones across the project) would fail.
See L950: https://github.com/MikkelHJuul/reactor-core/blob/issues/3340-object-array/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java
Thanks @MikkelHJuul – please have a look at the linked PR (#3707), I suppose it incorporates the same cancellation validation, although using Flux#next() which implicitly cancels the source.
@chemicL - I'm not that deeply invested in to this part of these interactions but it looks and sounds reasonable
#3707 resolves this.