reactor-core icon indicating copy to clipboard operation
reactor-core copied to clipboard

investigate flaky `FluxRefCountGraceTest > raceSubscribeAndCancelNoTimeout` raceTest

Open OlegDokuka opened this issue 2 years ago • 5 comments

FluxRefCountGraceTest > raceSubscribeAndCancelNoTimeout() FAILED
    java.lang.AssertionError: [signalCount2] 
    Expecting AtomicInteger(99999) to have value:
      100000
    but did not.
        at reactor.core.publisher.FluxRefCountGraceTest.raceSubscribeAndCancelNoTimeout(FluxRefCountGraceTest.java:299)

OlegDokuka avatar Nov 13 '23 19:11 OlegDokuka

Interestingly, the combination of replay with refCount might be problematic as pointed out in one summary by Simon. This reflection came a few years after the test was introduced to address #1260

For reference, the output when the test fails:

14:14:32.349 [boundedElastic-1039] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disconnected
Caused by: java.util.concurrent.CancellationException: Disconnected
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1390)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:133)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:314)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:255)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:877)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:965)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1395)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:133)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:314)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:255)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:877)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:965)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.onNext(FluxReplay.java:1344)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:878)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:803)
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:161)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$25(FluxRefCountGraceTest.java:277)
	at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:95)
	at reactor.core.publisher.FluxReplay.connect(FluxReplay.java:1141)
	at reactor.core.publisher.FluxRefCountGrace.subscribe(FluxRefCountGrace.java:100)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4490)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4605)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4318)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$27(FluxRefCountGraceTest.java:287)
	at reactor.test.util.RaceTestUtils.lambda$race$2(RaceTestUtils.java:168)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
14:14:32.349 [boundedElastic-1040] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disconnected
Caused by: java.util.concurrent.CancellationException: Disconnected
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1390)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:133)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:314)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:255)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:877)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:965)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1395)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:133)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:314)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:255)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:877)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:965)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.onNext(FluxReplay.java:1344)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:878)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:803)
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:161)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$25(FluxRefCountGraceTest.java:277)
	at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:95)
	at reactor.core.publisher.FluxReplay.connect(FluxReplay.java:1141)
	at reactor.core.publisher.FluxRefCountGrace.subscribe(FluxRefCountGrace.java:100)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4490)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4605)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4318)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$27(FluxRefCountGraceTest.java:287)
	at reactor.test.util.RaceTestUtils.lambda$race$2(RaceTestUtils.java:168)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

java.lang.AssertionError: [signalCount2] 
Expecting AtomicInteger(99998) to have value:
  100000
but did not.

	at reactor.core.publisher.FluxRefCountGraceTest.raceSubscribeAndCancelNoTimeout(FluxRefCountGraceTest.java:299)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)

It feels like at some racy case both Subscribers of FluxRefCountGrace get the same error drop caused by cancellation. What is unexpected though is that both trigger FluxReplay.connect, while the assumption is that only one subscriber makes that call. Perhaps the race is that while the first subscriber is already canceling and the FluxReplay subscription is being disposed, the next subscriber comes, notices that the connection reports it's disposed and initiates a new connection and that is causing issues.

chemicL avatar Jan 11 '24 14:01 chemicL

For debugging locally, I added

new RuntimeException("onError called!").printStackTrace();

to FluxRefCountGrace$RefCountInner.onError and got this during a failing run:

java.lang.RuntimeException: onError called!
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onError(FluxRefCountGrace.java:270)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:869)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1405)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1405)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.onNext(FluxReplay.java:1353)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:880)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:805)
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:163)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$25(FluxRefCountGraceTest.java:277)
	at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:97)
	at reactor.core.publisher.FluxReplay.connect(FluxReplay.java:1149)
	at reactor.core.publisher.FluxRefCountGrace.subscribe(FluxRefCountGrace.java:102)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4339)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$29(FluxRefCountGraceTest.java:293)
	at reactor.test.util.RaceTestUtils.lambda$race$2(RaceTestUtils.java:180)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
java.lang.RuntimeException: onError called!
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onError(FluxRefCountGrace.java:270)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:869)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplayInner.request(FluxReplay.java:1721)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.request(FluxRefCountGrace.java:311)
	at reactor.core.publisher.MonoNext$NextSubscriber.request(MonoNext.java:108)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138)
	at reactor.core.publisher.LambdaMonoSubscriber.onSubscribe(LambdaMonoSubscriber.java:121)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:171)
	at reactor.core.publisher.MonoNext$NextSubscriber.onSubscribe(MonoNext.java:70)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.setRefConnection(FluxRefCountGrace.java:236)
	at reactor.core.publisher.FluxRefCountGrace.subscribe(FluxRefCountGrace.java:98)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4339)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$27(FluxRefCountGraceTest.java:287)
	at reactor.test.util.RaceTestUtils.lambda$race$2(RaceTestUtils.java:180)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
09:12:32.364 [boundedElastic-71] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disconnected
Caused by: java.util.concurrent.CancellationException: Disconnected
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1399)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1405)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.onNext(FluxReplay.java:1353)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:880)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:805)
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:163)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$25(FluxRefCountGraceTest.java:277)
	at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:97)
	at reactor.core.publisher.FluxReplay.connect(FluxReplay.java:1149)
	at reactor.core.publisher.FluxRefCountGrace.subscribe(FluxRefCountGrace.java:102)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4339)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$29(FluxRefCountGraceTest.java:293)
	at reactor.test.util.RaceTestUtils.lambda$race$2(RaceTestUtils.java:180)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
java.lang.RuntimeException: onError called!
09:12:32.364 [boundedElastic-72] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disconnected
Caused by: java.util.concurrent.CancellationException: Disconnected
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1399)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1405)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.onNext(FluxReplay.java:1353)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:880)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:805)
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:163)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$25(FluxRefCountGraceTest.java:277)
	at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:97)
	at reactor.core.publisher.FluxReplay.connect(FluxReplay.java:1149)
	at reactor.core.publisher.FluxRefCountGrace.subscribe(FluxRefCountGrace.java:102)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4339)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$29(FluxRefCountGraceTest.java:293)
	at reactor.test.util.RaceTestUtils.lambda$race$2(RaceTestUtils.java:180)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onError(FluxRefCountGrace.java:270)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:869)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1405)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1405)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.onNext(FluxReplay.java:1353)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:880)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:805)
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:163)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$25(FluxRefCountGraceTest.java:277)
	at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:97)
	at reactor.core.publisher.FluxReplay.connect(FluxReplay.java:1149)
	at reactor.core.publisher.FluxRefCountGrace.subscribe(FluxRefCountGrace.java:102)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4339)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$29(FluxRefCountGraceTest.java:293)
	at reactor.test.util.RaceTestUtils.lambda$race$2(RaceTestUtils.java:180)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
09:12:32.366 [boundedElastic-72] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disconnected
Caused by: java.util.concurrent.CancellationException: Disconnected
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1399)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1405)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.onNext(FluxReplay.java:1353)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:880)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:805)
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:163)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$25(FluxRefCountGraceTest.java:277)
	at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:97)
	at reactor.core.publisher.FluxReplay.connect(FluxReplay.java:1149)
	at reactor.core.publisher.FluxRefCountGrace.subscribe(FluxRefCountGrace.java:102)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4339)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$29(FluxRefCountGraceTest.java:293)
	at reactor.test.util.RaceTestUtils.lambda$race$2(RaceTestUtils.java:180)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

java.lang.AssertionError: [signalCount1] 
Expecting AtomicInteger(99997) to have value:
  100000
but did not.

	at reactor.core.publisher.FluxRefCountGraceTest.raceSubscribeAndCancelNoTimeout(FluxRefCountGraceTest.java:298)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)


chemicL avatar Jan 12 '24 08:01 chemicL

I just wanted to chip in.

When I was doing my changes to the replay-buffer I found that there was a small detail in there that, when not implemented properly in the ReplayBuffer, these tests (this and similar ones across the project) would fail.

See L950: https://github.com/MikkelHJuul/reactor-core/blob/issues/3340-object-array/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java

MikkelHJuul avatar Feb 13 '24 11:02 MikkelHJuul

Thanks @MikkelHJuul – please have a look at the linked PR (#3707), I suppose it incorporates the same cancellation validation, although using Flux#next() which implicitly cancels the source.

chemicL avatar Feb 19 '24 12:02 chemicL

@chemicL - I'm not that deeply invested in to this part of these interactions but it looks and sounds reasonable

MikkelHJuul avatar Feb 21 '24 19:02 MikkelHJuul

#3707 resolves this.

chemicL avatar Jun 21 '24 08:06 chemicL