rsocket-java
rsocket-java copied to clipboard
Limitations of the Resume functionality. No intended to be used by load-balanced server?
I want to use the Resume functionality on a load-balanced server.
Motivation
I can use a persistent ResumableFramesStore by configuring a store with the RSocketServerCustomizer. However, I cannot provide a custom persistent SessionManager, which is why I receive "unknown resume token" exceptions.
Desired solution
Provide a way to implement a custom SessionManager.
Hi, @ansgarschulte !
Thank you for finding time to fill this issue!
Would you mind providing us with a small sample that can showcase what does not work for you exactly?
Thanks, Oleh
Hi @OlegDokuka
Here you are: https://github.com/ansgarschulte/rsocket_client_server_resume_echo/tree/feat/redis_store
- Start redis app of the repository or a local running redis
- Start the server app with RedisResumableFramesStore
- note down the server pid
- Start client app
- see message ping / pong between server and client
- kill -9 server_pid
- start server application after kill
- see error on client:
2025-01-28T20:20:36.655+01:00 ERROR 86185 --- [client] [actor-tcp-nio-6] reactor.core.publisher.Operators : Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: RejectedResumeException (0x4): unknown resume token
Caused by: io.rsocket.exceptions.RejectedResumeException: unknown resume token
at io.rsocket.exceptions.Exceptions.from(Exceptions.java:64) ~[rsocket-core-1.1.3.jar:na]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoDoFinally] :
reactor.core.publisher.Mono.doFinally(Mono.java:2679)
io.rsocket.resume.ClientRSocketSession.<init>(ClientRSocketSession.java:114)
Error has been observed at the following site(s):
*__Mono.doFinally ⇢ at io.rsocket.resume.ClientRSocketSession.<init>(ClientRSocketSession.java:114)
Original Stack Trace:
at io.rsocket.exceptions.Exceptions.from(Exceptions.java:64) ~[rsocket-core-1.1.3.jar:na]
at io.rsocket.resume.ClientRSocketSession.tryReestablishSession(ClientRSocketSession.java:287) ~[rsocket-core-1.1.3.jar:na]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196) ~[reactor-core-3.7.1.jar:3.7.1]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:245) ~[reactor-core-3.7.1.jar:3.7.1]
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305) ~[reactor-core-3.7.1.jar:3.7.1]
at reactor.core.publisher.FluxFirstWithSignal$FirstEmittingSubscriber.onNext(FluxFirstWithSignal.java:333) ~[reactor-core-3.7.1.jar:3.7.1]
at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:176) ~[reactor-core-3.7.1.jar:3.7.1]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:115) ~[rsocket-core-1.1.3.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.3.jar:na]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.7.1.jar:3.7.1]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.7.1.jar:3.7.1]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:292) ~[reactor-netty-core-1.2.1.jar:1.2.1]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:401) ~[reactor-netty-core-1.2.1.jar:1.2.1]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:435) ~[reactor-netty-core-1.2.1.jar:1.2.1]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115) ~[reactor-netty-core-1.2.1.jar:1.2.1]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.116.Final.jar:4.1.116.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
at java.base/java.lang.Thread.run(Thread.java:1575) ~[na:na]
2025-01-28T20:20:36.659+01:00 INFO 86185 --- [client] [ main] .s.b.a.l.ConditionEvaluationReportLogger :
Error starting ApplicationContext. To display the condition evaluation report re-run your application with 'debug' enabled.
2025-01-28T20:20:36.669+01:00 ERROR 86185 --- [client] [ main] o.s.boot.SpringApplication : Application run failed
io.rsocket.exceptions.RejectedResumeException: unknown resume token
at io.rsocket.exceptions.Exceptions.from(Exceptions.java:64) ~[rsocket-core-1.1.3.jar:na]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoDoFinally] :
reactor.core.publisher.Mono.doFinally(Mono.java:2679)
io.rsocket.resume.ClientRSocketSession.<init>(ClientRSocketSession.java:114)
Error has been observed at the following site(s):
*____Mono.doFinally ⇢ at io.rsocket.resume.ClientRSocketSession.<init>(ClientRSocketSession.java:114)
*__________Flux.map ⇢ at org.springframework.messaging.rsocket.DefaultRSocketRequester$DefaultRequestSpec.retrieveFlux(DefaultRSocketRequester.java:324)
|_ Flux.map ⇢ at org.springframework.messaging.rsocket.DefaultRSocketRequester$DefaultRequestSpec.retrieveFlux(DefaultRSocketRequester.java:324)
|_ Flux.collectList ⇢ at com.example.client.RSocketClientApplication.run(RSocketClientApplication.java:52)
Original Stack Trace:
at io.rsocket.exceptions.Exceptions.from(Exceptions.java:64) ~[rsocket-core-1.1.3.jar:na]
at io.rsocket.resume.ClientRSocketSession.tryReestablishSession(ClientRSocketSession.java:287) ~[rsocket-core-1.1.3.jar:na]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196) ~[reactor-core-3.7.1.jar:3.7.1]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:245) ~[reactor-core-3.7.1.jar:3.7.1]
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305) ~[reactor-core-3.7.1.jar:3.7.1]
at reactor.core.publisher.FluxFirstWithSignal$FirstEmittingSubscriber.onNext(FluxFirstWithSignal.java:333) ~[reactor-core-3.7.1.jar:3.7.1]
at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:176) ~[reactor-core-3.7.1.jar:3.7.1]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:115) ~[rsocket-core-1.1.3.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.3.jar:na]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.7.1.jar:3.7.1]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.7.1.jar:3.7.1]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:292) ~[reactor-netty-core-1.2.1.jar:1.2.1]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:401) ~[reactor-netty-core-1.2.1.jar:1.2.1]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:435) ~[reactor-netty-core-1.2.1.jar:1.2.1]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115) ~[reactor-netty-core-1.2.1.jar:1.2.1]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.116.Final.jar:4.1.116.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
at java.base/java.lang.Thread.run(Thread.java:1575) ~[na:na]
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:104) ~[reactor-core-3.7.1.jar:3.7.1]
at reactor.core.publisher.Mono.block(Mono.java:1779) ~[reactor-core-3.7.1.jar:3.7.1]
at com.example.client.RSocketClientApplication.run(RSocketClientApplication.java:53) ~[classes/:na]
at org.springframework.boot.SpringApplication.lambda$callRunner$5(SpringApplication.java:788) ~[spring-boot-3.4.1.jar:3.4.1]
at org.springframework.util.function.ThrowingConsumer$1.acceptWithException(ThrowingConsumer.java:82) ~[spring-core-6.2.1.jar:6.2.1]
at org.springframework.util.function.ThrowingConsumer.accept(ThrowingConsumer.java:60) ~[spring-core-6.2.1.jar:6.2.1]
at org.springframework.util.function.ThrowingConsumer$1.accept(ThrowingConsumer.java:86) ~[spring-core-6.2.1.jar:6.2.1]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:796) ~[spring-boot-3.4.1.jar:3.4.1]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:787) ~[spring-boot-3.4.1.jar:3.4.1]
at org.springframework.boot.SpringApplication.lambda$callRunners$3(SpringApplication.java:772) ~[spring-boot-3.4.1.jar:3.4.1]
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[na:na]
at java.base/java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:357) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:571) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:560) ~[na:na]
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) ~[na:na]
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:265) ~[na:na]
at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:636) ~[na:na]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:772) ~[spring-boot-3.4.1.jar:3.4.1]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:325) ~[spring-boot-3.4.1.jar:3.4.1]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1361) ~[spring-boot-3.4.1.jar:3.4.1]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1350) ~[spring-boot-3.4.1.jar:3.4.1]
at com.example.client.RSocketClientApplication.main(RSocketClientApplication.java:26) ~[classes/:na]
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:104) ~[reactor-core-3.7.1.jar:3.7.1]
at reactor.core.publisher.Mono.block(Mono.java:1779) ~[reactor-core-3.7.1.jar:3.7.1]
at com.example.client.RSocketClientApplication.run(RSocketClientApplication.java:60) ~[classes/:na]
at org.springframework.boot.SpringApplication.lambda$callRunner$5(SpringApplication.java:788) ~[spring-boot-3.4.1.jar:3.4.1]
at org.springframework.util.function.ThrowingConsumer$1.acceptWithException(ThrowingConsumer.java:82) ~[spring-core-6.2.1.jar:6.2.1]
at org.springframework.util.function.ThrowingConsumer.accept(ThrowingConsumer.java:60) ~[spring-core-6.2.1.jar:6.2.1]
at org.springframework.util.function.ThrowingConsumer$1.accept(ThrowingConsumer.java:86) ~[spring-core-6.2.1.jar:6.2.1]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:796) ~[spring-boot-3.4.1.jar:3.4.1]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:787) ~[spring-boot-3.4.1.jar:3.4.1]
at org.springframework.boot.SpringApplication.lambda$callRunners$3(SpringApplication.java:772) ~[spring-boot-3.4.1.jar:3.4.1]
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[na:na]
at java.base/java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:357) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:571) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:560) ~[na:na]
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) ~[na:na]
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:265) ~[na:na]
at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:636) ~[na:na]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:772) ~[spring-boot-3.4.1.jar:3.4.1]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:325) ~[spring-boot-3.4.1.jar:3.4.1]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1361) ~[spring-boot-3.4.1.jar:3.4.1]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1350) ~[spring-boot-3.4.1.jar:3.4.1]
at com.example.client.RSocketClientApplication.main(RSocketClientApplication.java:26) ~[classes/:na]