rsocket-java
rsocket-java copied to clipboard
java.util.concurrent.CancellationException: Disposed when disposing flux from channel communication in tests
I have created a sample application which uses channel
style communication. Unfortunatelly when cancelling the subscription from the tests I receive an exception :
stacktrace
022-04-10 20:33:49.106 ERROR 338925 --- [or-http-epoll-4] reactor.core.publisher.Operators : Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disposed
Caused by: java.util.concurrent.CancellationException: Disposed
at io.rsocket.internal.UnboundedProcessor.dispose(UnboundedProcessor.java:545) ~[rsocket-core-1.1.2.jar:na]
at io.rsocket.transport.netty.TcpDuplexConnection.doOnClose(TcpDuplexConnection.java:67) ~[rsocket-transport-netty-1.1.1.jar:na]
at io.rsocket.internal.BaseDuplexConnection.lambda$new$0(BaseDuplexConnection.java:30) ~[rsocket-core-1.1.2.jar:na]
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:163) ~[reactor-core-3.4.16.jar:3.4.16]
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:146) ~[reactor-core-3.4.16.jar:3.4.16]
at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:238) ~[reactor-core-3.4.16.jar:3.4.16]
at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:70) ~[reactor-core-3.4.16.jar:3.4.16]
at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46) ~[reactor-core-3.4.16.jar:3.4.16]
at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:51) ~[rsocket-core-1.1.2.jar:na]
at io.rsocket.transport.netty.TcpDuplexConnection.lambda$new$0(TcpDuplexConnection.java:49) ~[rsocket-transport-netty-1.1.1.jar:na]
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1164) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:755) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:731) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:620) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.shutdownInput(AbstractEpollChannel.java:522) ~[netty-transport-classes-epoll-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:823) ~[netty-transport-classes-epoll-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) ~[netty-transport-classes-epoll-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[netty-transport-classes-epoll-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Expected Behavior
The exception is not thrown on channel subscription cancellation.
Actual Behavior
Exception is thrown.
Steps to Reproduce
Sample application - https://github.com/mkrzywanski/rsocket-reactive-chat. Just run user1ShouldGetMessagesFromUser2
test.
Your Environment
Library versions are listed in the provided project.
-
Platform (eg. JVM version (
javar -version
) or Node version (node --version
)): openjdk 17.0.1 2021-10-19 OpenJDK Runtime Environment Temurin-17.0.1+12 (build 17.0.1+12) OpenJDK 64-Bit Server VM Temurin-17.0.1+12 (build 17.0.1+12, mixed mode, sharing) -
OS and version (eg
uname -a
): Ubunut 20
@mkrzywanski can your try to add to your main method a line with Hooks.onOperatorDebug
to see the full stack trace of the exception
Since it appears in tests, I have added it in my @BeforeAll
method but the stacktrace has not changed, it remains the same. Also the exception seems to be logged multiple times from different threads.
Adding Hooks.onErrorDropped(throwable -> {});
makes it disappear tho, but this is not a way to go I believe.
@mkrzywanski Also, can you try to reproduce the same just with a pure rsocket to reduce the problem surface?
I can give it a try, but I am not that familiar with rsocket api yet as I started using it today.
It seems that the error is caused when cancelling the Flux
returned from mongodb changeStream
feature. I tried several configurations with and without security/mongodb change stream and only not returning mongodb changeStream
flux from my message mapping handler helped.