rsocket-java icon indicating copy to clipboard operation
rsocket-java copied to clipboard

KeepAliveSupport$ClientKeepAliveSupport fills the Executor queue

Open adrian-tarau opened this issue 4 years ago • 2 comments

Trying to troubleshoot some issues, support to export Reactor metrics to Grafana was added. To my surprise, the queue was always filled with tasks, even if nothing much happened in the app.

          |executor.queued                                               |tasks   |14         |name=parallel(10,"parallel")-0, reactor.scheduler.id=parallel(10,"parallel") |The approximate number of tasks that are queued for execution                            |
          |executor.queued                                               |tasks   |15         |name=parallel(10,"parallel")-1, reactor.scheduler.id=parallel(10,"parallel") |The approximate number of tasks that are queued for execution                            |
          |executor.queued                                               |tasks   |14         |name=parallel(10,"parallel")-2, reactor.scheduler.id=parallel(10,"parallel") |The approximate number of tasks that are queued for execution                            |
          |executor.queued                                               |tasks   |14         |name=parallel(10,"parallel")-3, reactor.scheduler.id=parallel(10,"parallel") |The approximate number of tasks that are queued for execution                            |
          |executor.queued                                               |tasks   |15         |name=parallel(10,"parallel")-4, reactor.scheduler.id=parallel(10,"parallel") |The approximate number of tasks that are queued for execution                            |
          |executor.queued                                               |tasks   |14         |name=parallel(10,"parallel")-5, reactor.scheduler.id=parallel(10,"parallel") |The approximate number of tasks that are queued for execution                            |
          |executor.queued                                               |tasks   |15         |name=parallel(10,"parallel")-6, reactor.scheduler.id=parallel(10,"parallel") |The approximate number of tasks that are queued for execution                            |
          |executor.queued                                               |tasks   |15         |name=parallel(10,"parallel")-7, reactor.scheduler.id=parallel(10,"parallel") |The approximate number of tasks that are queued for execution                            |
          |executor.queued                                               |tasks   |15         |name=parallel(10,"parallel")-8, reactor.scheduler.id=parallel(10,"parallel") |The approximate number of tasks that are queued for execution                            |
          |executor.queued                                               |tasks   |13         |name=parallel(10,"parallel")-9, reactor.scheduler.id=parallel(10,"parallel") |The approximate number of tasks that are queued for execution                          

Took a memory dump, looked in the queue, and the queues were full of KeepAliveSupport$ClientKeepAliveSupport ...

Not much is described about keep alive other than "it is required" (based on JavaDoc comments). I tried to set 0, there were no such tasks in the queue but nothing worked :)

Am I missing something? If one really wants to monitor how the system is doing, looking at the internals ...it would be confusing ... so many tasks (one for each connection) in "pending" state.

adrian-tarau avatar May 06 '20 01:05 adrian-tarau

KeepAlieve is an important part of the connection failure diagnosis that we have in RSocket. Thus JavaDocs says to enable it and have a meaningful ping period and the timeout period.

I believe that you are using MicrometerDuplexConnection which is logging all the signals on the level of connections (basically it logs all the frames which are going through the stream).

Thus I have a question whether it is something that you wanna log?

OlegDokuka avatar May 06 '20 06:05 OlegDokuka

Meaningful but not mandatory? When I tried to set the ping interval to 0, which the code allows ... nothing worked. Anyway, the goal is not to remove this feature, the goal is to be able to see what's "pending" .... large number of tasks pending (in a given service or across all), there is a problem. However, it is not possible to do that since the queues are full of these ping tasks.

The only thing that I changed, was to enable metrics

                Mono<RSocket> mono = RSocketFactory.connect()                        
                  .transport(TcpClientTransport.create(TcpClient.create().host(host).port(port).metrics(true)))
                        .start();
                mono.doOnError(Throwable.class, throwable -> handleFailure(throwable, false))
                        .doOnDiscard(Object.class, this::handleDiscard)
                        .doOnTerminate(this::reportTermination);
                mono.subscribe(this::connectSuccessful, this::connectFailed, () -> {
                    LOGGER.info("Subscriber completed for " + getDescription());
                });

adrian-tarau avatar May 06 '20 12:05 adrian-tarau