rsocket-java icon indicating copy to clipboard operation
rsocket-java copied to clipboard

Limitations of the Resume functionality. No intended to be used by load-balanced server?

Open ansgarschulte opened this issue 9 months ago • 2 comments
trafficstars

I want to use the Resume functionality on a load-balanced server.

Motivation

I can use a persistent ResumableFramesStore by configuring a store with the RSocketServerCustomizer. However, I cannot provide a custom persistent SessionManager, which is why I receive "unknown resume token" exceptions.

Desired solution

Provide a way to implement a custom SessionManager.

ansgarschulte avatar Jan 24 '25 11:01 ansgarschulte

Hi, @ansgarschulte !

Thank you for finding time to fill this issue!

Would you mind providing us with a small sample that can showcase what does not work for you exactly?

Thanks, Oleh

OlegDokuka avatar Jan 27 '25 14:01 OlegDokuka

Hi @OlegDokuka

Here you are: https://github.com/ansgarschulte/rsocket_client_server_resume_echo/tree/feat/redis_store

  1. Start redis app of the repository or a local running redis
  2. Start the server app with RedisResumableFramesStore
  3. note down the server pid
  4. Start client app
  5. see message ping / pong between server and client
  6. kill -9 server_pid
  7. start server application after kill
  8. see error on client:
2025-01-28T20:20:36.655+01:00 ERROR 86185 --- [client] [actor-tcp-nio-6] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: RejectedResumeException (0x4): unknown resume token
Caused by: io.rsocket.exceptions.RejectedResumeException: unknown resume token
	at io.rsocket.exceptions.Exceptions.from(Exceptions.java:64) ~[rsocket-core-1.1.3.jar:na]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoDoFinally] :
	reactor.core.publisher.Mono.doFinally(Mono.java:2679)
	io.rsocket.resume.ClientRSocketSession.<init>(ClientRSocketSession.java:114)
Error has been observed at the following site(s):
	*__Mono.doFinally ⇢ at io.rsocket.resume.ClientRSocketSession.<init>(ClientRSocketSession.java:114)
Original Stack Trace:
		at io.rsocket.exceptions.Exceptions.from(Exceptions.java:64) ~[rsocket-core-1.1.3.jar:na]
		at io.rsocket.resume.ClientRSocketSession.tryReestablishSession(ClientRSocketSession.java:287) ~[rsocket-core-1.1.3.jar:na]
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:245) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.core.publisher.FluxFirstWithSignal$FirstEmittingSubscriber.onNext(FluxFirstWithSignal.java:333) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:176) ~[reactor-core-3.7.1.jar:3.7.1]
		at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:115) ~[rsocket-core-1.1.3.jar:na]
		at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.3.jar:na]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:292) ~[reactor-netty-core-1.2.1.jar:1.2.1]
		at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:401) ~[reactor-netty-core-1.2.1.jar:1.2.1]
		at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:435) ~[reactor-netty-core-1.2.1.jar:1.2.1]
		at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115) ~[reactor-netty-core-1.2.1.jar:1.2.1]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
		at java.base/java.lang.Thread.run(Thread.java:1575) ~[na:na]

2025-01-28T20:20:36.659+01:00  INFO 86185 --- [client] [           main] .s.b.a.l.ConditionEvaluationReportLogger : 

Error starting ApplicationContext. To display the condition evaluation report re-run your application with 'debug' enabled.
2025-01-28T20:20:36.669+01:00 ERROR 86185 --- [client] [           main] o.s.boot.SpringApplication               : Application run failed

io.rsocket.exceptions.RejectedResumeException: unknown resume token
	at io.rsocket.exceptions.Exceptions.from(Exceptions.java:64) ~[rsocket-core-1.1.3.jar:na]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoDoFinally] :
	reactor.core.publisher.Mono.doFinally(Mono.java:2679)
	io.rsocket.resume.ClientRSocketSession.<init>(ClientRSocketSession.java:114)
Error has been observed at the following site(s):
	*____Mono.doFinally ⇢ at io.rsocket.resume.ClientRSocketSession.<init>(ClientRSocketSession.java:114)
	*__________Flux.map ⇢ at org.springframework.messaging.rsocket.DefaultRSocketRequester$DefaultRequestSpec.retrieveFlux(DefaultRSocketRequester.java:324)
	|_         Flux.map ⇢ at org.springframework.messaging.rsocket.DefaultRSocketRequester$DefaultRequestSpec.retrieveFlux(DefaultRSocketRequester.java:324)
	|_ Flux.collectList ⇢ at com.example.client.RSocketClientApplication.run(RSocketClientApplication.java:52)
Original Stack Trace:
		at io.rsocket.exceptions.Exceptions.from(Exceptions.java:64) ~[rsocket-core-1.1.3.jar:na]
		at io.rsocket.resume.ClientRSocketSession.tryReestablishSession(ClientRSocketSession.java:287) ~[rsocket-core-1.1.3.jar:na]
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:245) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.core.publisher.FluxFirstWithSignal$FirstEmittingSubscriber.onNext(FluxFirstWithSignal.java:333) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:176) ~[reactor-core-3.7.1.jar:3.7.1]
		at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:115) ~[rsocket-core-1.1.3.jar:na]
		at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.3.jar:na]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:292) ~[reactor-netty-core-1.2.1.jar:1.2.1]
		at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:401) ~[reactor-netty-core-1.2.1.jar:1.2.1]
		at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:435) ~[reactor-netty-core-1.2.1.jar:1.2.1]
		at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115) ~[reactor-netty-core-1.2.1.jar:1.2.1]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
		at java.base/java.lang.Thread.run(Thread.java:1575) ~[na:na]
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:104) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.core.publisher.Mono.block(Mono.java:1779) ~[reactor-core-3.7.1.jar:3.7.1]
		at com.example.client.RSocketClientApplication.run(RSocketClientApplication.java:53) ~[classes/:na]
		at org.springframework.boot.SpringApplication.lambda$callRunner$5(SpringApplication.java:788) ~[spring-boot-3.4.1.jar:3.4.1]
		at org.springframework.util.function.ThrowingConsumer$1.acceptWithException(ThrowingConsumer.java:82) ~[spring-core-6.2.1.jar:6.2.1]
		at org.springframework.util.function.ThrowingConsumer.accept(ThrowingConsumer.java:60) ~[spring-core-6.2.1.jar:6.2.1]
		at org.springframework.util.function.ThrowingConsumer$1.accept(ThrowingConsumer.java:86) ~[spring-core-6.2.1.jar:6.2.1]
		at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:796) ~[spring-boot-3.4.1.jar:3.4.1]
		at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:787) ~[spring-boot-3.4.1.jar:3.4.1]
		at org.springframework.boot.SpringApplication.lambda$callRunners$3(SpringApplication.java:772) ~[spring-boot-3.4.1.jar:3.4.1]
		at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[na:na]
		at java.base/java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:357) ~[na:na]
		at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:571) ~[na:na]
		at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:560) ~[na:na]
		at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) ~[na:na]
		at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) ~[na:na]
		at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:265) ~[na:na]
		at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:636) ~[na:na]
		at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:772) ~[spring-boot-3.4.1.jar:3.4.1]
		at org.springframework.boot.SpringApplication.run(SpringApplication.java:325) ~[spring-boot-3.4.1.jar:3.4.1]
		at org.springframework.boot.SpringApplication.run(SpringApplication.java:1361) ~[spring-boot-3.4.1.jar:3.4.1]
		at org.springframework.boot.SpringApplication.run(SpringApplication.java:1350) ~[spring-boot-3.4.1.jar:3.4.1]
		at com.example.client.RSocketClientApplication.main(RSocketClientApplication.java:26) ~[classes/:na]
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:104) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.core.publisher.Mono.block(Mono.java:1779) ~[reactor-core-3.7.1.jar:3.7.1]
		at com.example.client.RSocketClientApplication.run(RSocketClientApplication.java:60) ~[classes/:na]
		at org.springframework.boot.SpringApplication.lambda$callRunner$5(SpringApplication.java:788) ~[spring-boot-3.4.1.jar:3.4.1]
		at org.springframework.util.function.ThrowingConsumer$1.acceptWithException(ThrowingConsumer.java:82) ~[spring-core-6.2.1.jar:6.2.1]
		at org.springframework.util.function.ThrowingConsumer.accept(ThrowingConsumer.java:60) ~[spring-core-6.2.1.jar:6.2.1]
		at org.springframework.util.function.ThrowingConsumer$1.accept(ThrowingConsumer.java:86) ~[spring-core-6.2.1.jar:6.2.1]
		at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:796) ~[spring-boot-3.4.1.jar:3.4.1]
		at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:787) ~[spring-boot-3.4.1.jar:3.4.1]
		at org.springframework.boot.SpringApplication.lambda$callRunners$3(SpringApplication.java:772) ~[spring-boot-3.4.1.jar:3.4.1]
		at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[na:na]
		at java.base/java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:357) ~[na:na]
		at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:571) ~[na:na]
		at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:560) ~[na:na]
		at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) ~[na:na]
		at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) ~[na:na]
		at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:265) ~[na:na]
		at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:636) ~[na:na]
		at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:772) ~[spring-boot-3.4.1.jar:3.4.1]
		at org.springframework.boot.SpringApplication.run(SpringApplication.java:325) ~[spring-boot-3.4.1.jar:3.4.1]
		at org.springframework.boot.SpringApplication.run(SpringApplication.java:1361) ~[spring-boot-3.4.1.jar:3.4.1]
		at org.springframework.boot.SpringApplication.run(SpringApplication.java:1350) ~[spring-boot-3.4.1.jar:3.4.1]
		at com.example.client.RSocketClientApplication.main(RSocketClientApplication.java:26) ~[classes/:na]


ansgarschulte avatar Jan 28 '25 19:01 ansgarschulte