rsocket-java icon indicating copy to clipboard operation
rsocket-java copied to clipboard

java.util.concurrent.CancellationException: Disposed when disposing flux from channel communication in tests

Open mkrzywanski opened this issue 2 years ago • 5 comments

I have created a sample application which uses channel style communication. Unfortunatelly when cancelling the subscription from the tests I receive an exception :

stacktrace
  022-04-10 20:33:49.106 ERROR 338925 --- [or-http-epoll-4] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disposed
Caused by: java.util.concurrent.CancellationException: Disposed
	at io.rsocket.internal.UnboundedProcessor.dispose(UnboundedProcessor.java:545) ~[rsocket-core-1.1.2.jar:na]
	at io.rsocket.transport.netty.TcpDuplexConnection.doOnClose(TcpDuplexConnection.java:67) ~[rsocket-transport-netty-1.1.1.jar:na]
	at io.rsocket.internal.BaseDuplexConnection.lambda$new$0(BaseDuplexConnection.java:30) ~[rsocket-core-1.1.2.jar:na]
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:163) ~[reactor-core-3.4.16.jar:3.4.16]
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:146) ~[reactor-core-3.4.16.jar:3.4.16]
	at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:238) ~[reactor-core-3.4.16.jar:3.4.16]
	at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:70) ~[reactor-core-3.4.16.jar:3.4.16]
	at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46) ~[reactor-core-3.4.16.jar:3.4.16]
	at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:51) ~[rsocket-core-1.1.2.jar:na]
	at io.rsocket.transport.netty.TcpDuplexConnection.lambda$new$0(TcpDuplexConnection.java:49) ~[rsocket-transport-netty-1.1.1.jar:na]
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1164) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:755) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:731) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:620) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.shutdownInput(AbstractEpollChannel.java:522) ~[netty-transport-classes-epoll-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:823) ~[netty-transport-classes-epoll-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) ~[netty-transport-classes-epoll-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[netty-transport-classes-epoll-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

Expected Behavior

The exception is not thrown on channel subscription cancellation.

Actual Behavior

Exception is thrown.

Steps to Reproduce

Sample application - https://github.com/mkrzywanski/rsocket-reactive-chat. Just run user1ShouldGetMessagesFromUser2 test.

Your Environment

Library versions are listed in the provided project.

  • Platform (eg. JVM version (javar -version) or Node version (node --version)): openjdk 17.0.1 2021-10-19 OpenJDK Runtime Environment Temurin-17.0.1+12 (build 17.0.1+12) OpenJDK 64-Bit Server VM Temurin-17.0.1+12 (build 17.0.1+12, mixed mode, sharing)

  • OS and version (eg uname -a): Ubunut 20

mkrzywanski avatar Apr 10 '22 18:04 mkrzywanski

@mkrzywanski can your try to add to your main method a line with Hooks.onOperatorDebug to see the full stack trace of the exception

OlegDokuka avatar Apr 10 '22 18:04 OlegDokuka

Since it appears in tests, I have added it in my @BeforeAll method but the stacktrace has not changed, it remains the same. Also the exception seems to be logged multiple times from different threads.

Adding Hooks.onErrorDropped(throwable -> {}); makes it disappear tho, but this is not a way to go I believe.

mkrzywanski avatar Apr 10 '22 19:04 mkrzywanski

@mkrzywanski Also, can you try to reproduce the same just with a pure rsocket to reduce the problem surface?

OlegDokuka avatar Apr 10 '22 19:04 OlegDokuka

I can give it a try, but I am not that familiar with rsocket api yet as I started using it today.

mkrzywanski avatar Apr 10 '22 19:04 mkrzywanski

It seems that the error is caused when cancelling the Flux returned from mongodb changeStream feature. I tried several configurations with and without security/mongodb change stream and only not returning mongodb changeStream flux from my message mapping handler helped.

mkrzywanski avatar Apr 12 '22 10:04 mkrzywanski