rsocket-java icon indicating copy to clipboard operation
rsocket-java copied to clipboard

add auto reconnection option

Open sergey-mesh opened this issue 3 years ago • 4 comments

It would be convenient to have two options for setting up a connection: lazy on any first call and automatic when the server is available. For example, chat. It is desirable that the running client automatically connects to the server and shows the server availability as soon as possible. The first option works great, but for the second it have to make an additional crutch. Thanks to Oleg @OlegDokuka. He helped in an optimal solution to this issue.

requester
                .rsocketClient()
                .source()
                .flatMap(rsocket -> rsocket.onClose())
                .repeat()
                .retryWhen(Retry.backoff(...))
                .subscribe();

This works great. But it would be more convenient to implement a similar option out of the box. So that you can choose one of two options in the connection settings (lazy or auto-connect). It is even possible to set the condition for switching from one mode to another at runtime.

sergey-mesh avatar Feb 22 '21 20:02 sergey-mesh

Hey, I have a similar problem...

I've been working on this project https://github.com/ivangfr/springboot-rsocket-webflux-aop where I use rsocket.

The project is composed of movie-client-shell, movie-client-ui, and movie-server. Here is the Project Architecture where you can find a diagram that explains the communication among the services.

One problem that I see in my solution is that, once movie-server restarts, movie-client-ui doesn't connect to it anymore.

I have changed the MovieServerRSocketConfig of movie-client-ui service as @OlegDokuka suggested in the issue #984

Now, I am creating RSocketRequester this way

...
SocketAcceptor socketAcceptor = RSocketMessageHandler.responder(rSocketStrategies, movieClientUiController);

RSocketRequester rSocketRequester = rSocketRequesterBuilder
        .setupRoute("client.registration")
        .setupData(clientId)
        .rsocketStrategies(rSocketStrategies)
        .rsocketConnector(connector -> connector.acceptor(socketAcceptor))
        .transport(clientTransport);

rSocketRequester.rsocketClient()
        .source()
        .flatMap(RSocket::onClose)
        .repeat()
        .retryWhen(Retry.fixedDelay(120, Duration.ofSeconds(1)))
        .doOnError(error -> log.warn("Connection CLOSED"))
        .doFinally(consumer -> log.info("DISCONNECTED"))
        .subscribe();
...

However, it's not working.

Basically,

  1. I am starting movie-server and movie-client-ui.
  2. Once both are up and running, I restart movie-server. It takes around 10 seconds to shutdown and start.
  3. Now, both movie-server and movie-client-ui are up and running, however, movie-client-ui is not connected to movie-server.
  4. Two minutes later, I see the following exception in movie-client-ui. It looks like it's trying to connect without success.
    ERROR 17499 --- [     parallel-1] reactor.core.publisher.Operators         : Operator called default onErrorDropped
    
    reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 120/120
    Caused by: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 120/120
    	at reactor.core.Exceptions.retryExhausted(Exceptions.java:290) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.util.retry.RetryBackoffSpec.lambda$static$0(RetryBackoffSpec.java:67) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.util.retry.RetryBackoffSpec.lambda$generateCompanion$4(RetryBackoffSpec.java:557) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:374) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerComplete(FluxConcatMap.java:295) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onComplete(FluxConcatMap.java:884) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:305) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:119) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.3.jar:3.4.3]
    	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    	at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
    Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8080
    Caused by: java.net.ConnectException: Connection refused
    	at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:na]
    	at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779) ~[na:na]
    	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330) ~[netty-transport-4.1.59.Final.jar:4.1.59.Final]
    	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)    ~[netty-transport-4.1.59.Final.jar:4.1.59.Final]
    	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:707) ~[netty-transport-4.1.59.Final.jar:4.1.59.Final]
    	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.59.Final.jar:4.1.59.Final]
    	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.59.Final.jar:4.1.59.Final]
    	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.59.Final.jar:4.1.59.Final]
    	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.59.Final.jar:4.1.59.Final]
    	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.59.Final.jar:4.1.59.Final]
    	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.59.Final.jar:4.1.59.Final]
    	at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
    
    INFO 17499 --- [     parallel-1] c.m.m.config.MovieServerRSocketConfig    : DISCONNECTED
    

Update

Btw, I can start all services using the profile rsocket-websocket or rsocket-tcp.

The previous execution was using rsocket-websocket profile, that is why it's trying to connect to movie-server on port 8080

Running now the simulation using the rsocket-tcp, the retry exception is

ERROR 18727 --- [     parallel-1] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 120/120
Caused by: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 120/120
	at reactor.core.Exceptions.retryExhausted(Exceptions.java:290) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.util.retry.RetryBackoffSpec.lambda$static$0(RetryBackoffSpec.java:67) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.util.retry.RetryBackoffSpec.lambda$generateCompanion$4(RetryBackoffSpec.java:557) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:374) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerComplete(FluxConcatMap.java:295) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onComplete(FluxConcatMap.java:884) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:305) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:119) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.3.jar:3.4.3]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:7000
Caused by: java.net.ConnectException: Connection refused
	at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:na]
	at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779) ~[na:na]
	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330) ~[netty-transport-4.1.59.Final.jar:4.1.59.Final]
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) ~[netty-transport-4.1.59.Final.jar:4.1.59.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:707) ~[netty-transport-4.1.59.Final.jar:4.1.59.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.59.Final.jar:4.1.59.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.59.Final.jar:4.1.59.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.59.Final.jar:4.1.59.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.59.Final.jar:4.1.59.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.59.Final.jar:4.1.59.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.59.Final.jar:4.1.59.Final]
	at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

INFO 18727 --- [     parallel-1] c.m.m.config.MovieServerRSocketConfig    : DISCONNECTED

Now, trying to connect to movie-server port on 7000 (that seems correct),

Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:7000

ivangfr avatar Mar 09 '21 17:03 ivangfr

@ivangfr in your case you have not added the .reconnect(Retry...) bits to your setup, that is why you were not able to reconnect. Please modify your setup like the following:

RSocketRequester rSocketRequester = rSocketRequesterBuilder
        .setupRoute("client.registration")
        .setupData(clientId)
        .rsocketStrategies(rSocketStrategies)
        .rsocketConnector(connector -> connector.acceptor(socketAcceptor).reconnect(Retry...))
        .transport(clientTransport);

Also, your setup works only for a specific transport, thus changing "on-flight" between websocket and tcp will not work at all, since in that case the client will be still trying to reconnect via the websocket transport

OlegDokuka avatar Mar 10 '21 08:03 OlegDokuka

Thanks @OlegDokuka

It works!

So, I needed to do the following config

        SocketAcceptor socketAcceptor = RSocketMessageHandler.responder(rSocketStrategies, movieClientUiController);
        RetryBackoffSpec retryBackoffSpec = Retry.fixedDelay(120, Duration.ofSeconds(1));

        RSocketRequester rSocketRequester = rSocketRequesterBuilder
                .setupRoute("client.registration")
                .setupData(clientId)
                .rsocketStrategies(rSocketStrategies)
                .rsocketConnector(connector -> connector.acceptor(socketAcceptor).reconnect(retryBackoffSpec))
                .transport(clientTransport);

        rSocketRequester.rsocketClient()
                .source()
                .flatMap(RSocket::onClose)
                .repeat()
                .retryWhen(retryBackoffSpec)
                .doOnError(error -> log.warn("Connection CLOSED"))
                .doFinally(consumer -> log.info("DISCONNECTED"))
                .subscribe();

In fact, I don't change the transport on the fly. As I described in the README, the user needs to pick (during the startup) the profile. It can be rsocket-tcp, rsocket-websocket or default.

So, if movie-server is started with rsocket-tcp, movie-client-ui must also be started withrsocket-tcp, as I observe in this comment

...
- movie-client-ui

Open a new terminal and, inside springboot-rsocket-webflux-aop root folder, run one of the following commands (it should match with the one picked to run movie-server)
...

Thanks again for the help!

ivangfr avatar Mar 10 '21 21:03 ivangfr

just a link to another example which shows the need for this feature (see #1039)

OlegDokuka avatar Mar 18 '22 10:03 OlegDokuka