r2dbc-postgresql icon indicating copy to clipboard operation
r2dbc-postgresql copied to clipboard

ReactorNettyClient stucked on cancelled Conversation if that Conversation has more than 256 rows (size of reactor.bufferSize.small)

Open alexeykurshakov opened this issue 1 year ago • 14 comments
trafficstars

Bug Report

Versions

  • Driver: 1.0.5
  • Database: PostgreSQL 13.12
  • Java: 17
  • OS: MacOS, Linux

Current Behavior

When you have query zipped in parallel with some other failed function and that query return more than 256 rows it can leads to the case when you no have real consumer, because chain was cancelled, but you receive data from database that start to save it to ReactorNettyClinet.buffer. When this happens, any other attempts to get data from the database will fail because ReactorNettyClient.BackendMessageSubscriber.tryDrainLoop never call drainLoop because stucked conversation no have demands

private void tryDrainLoop() {
    while (hasBufferedItems() && hasDownstreamDemand()) {
        if (!drainLoop()) {
            return;
        }
    }
 }

Can reproduce using https://github.com/agorbachenko/r2dbc-connection-leak-demo If you increase System property "reactor.bufferSize.small" to 350, the attached example will start working

alexeykurshakov avatar Sep 25 '24 04:09 alexeykurshakov

Thanks a lot for chasing this issue down. Since you invested about 80% of the effort that is required to fix the issue, do you want to submit a pull request to clear out the cancelled conversations?

mp911de avatar Sep 25 '24 07:09 mp911de

I've never worked before with reactor library (mono, flux). But I found that it's not easy to track down what is the source of cancellation - error in parallel zip function, ordinal cancel or cancellation from Mono.from(fluxPublisher). For example

  Mono.from(Flux.just(1, 2, 3).doOnCancel(() -> {
                System.out.println("fire");
            })).subscribe();

will fire println with first emit and

 Flux.just(1, 2, 3).doOnCancel(() -> {
                System.out.println("fire");
            }).subscribe();

no println "fire" If you can help me track down the type of cancellation, sure, I can make a pull request.

alexeykurshakov avatar Sep 25 '24 10:09 alexeykurshakov

@alexeykurshakov these cancellations have reasonable explanations. A couple examples:

  • Mono.from(Publisher).subscribe() cancels the Publisher once the first item is emitted, as Mono expects at most item to be emitted to the Subscriber.
  • Flux.just(T...).subscribe() has no reason to cancel at all, as multiple items adhere to the Flux specification.
  • Flux.zip(Publisher, Publisher).subscribe() will cancel the other Publisher once one of them completes/errors.

For inspiration regarding test cases, perhaps you can use my examples with mocks. This was part of the investigation whether the r2dbc-pool is responsible for the connection leaks in https://github.com/r2dbc/r2dbc-pool/issues/198#issuecomment-1980367615.

chemicL avatar Sep 25 '24 12:09 chemicL

@mp911de https://github.com/pgjdbc/r2dbc-postgresql/blob/a13c02c09f83d44feaaea6d3416e3d6d5e0a5ad6/src/main/java/io/r2dbc/postgresql/PostgresqlStatement.java#L257 if you in SimpleQueryMessageFlow.exchange the original cancellation just ignored. I don't understand the correct behaviour Why you discard cancellation with Operators.discardOnCancel and what .doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release) should do?

alexeykurshakov avatar Sep 26 '24 12:09 alexeykurshakov

Operators.discardOnCancel is to drain protocol frames off the transport so that we can finalize the conversation with the server. If we just cancelled the consumption, then response frames from an earlier conversation would remain on the transport and feed into the next conversation.

mp911de avatar Sep 26 '24 14:09 mp911de

Sounds like it should works, but not 🤣. According to an issue example badThread never consumed data and sending cancel signal after real data feed ReactorNettyClient that leads to the case when it saved this messages in internal buffer. So in that example discard happened too late.

alexeykurshakov avatar Sep 26 '24 14:09 alexeykurshakov

I can provide a timeline of what happened. And then we'll figure out how to fix it.

alexeykurshakov avatar Sep 26 '24 14:09 alexeykurshakov

Hello! We've been hit by similar issue this past week during some load testing. I have attached a stacktrace. We also saw a few Netty LEAK errors stacktrace.

  • Spring Webflux and Kotlin, so we're using coroutines to await responses
  • We use kotlinx.coroutines.withTimeout around these queries
  • It usually only happens on one server instance
  • Lasts 15-30 seconds
  • Server recovers
  • Simple queries against the primary key (e.g., SELECT * FROM X WHERE ID = ?)
  • We're using a connection pool

travispeloton avatar Oct 29 '24 20:10 travispeloton

@alexeykurshakov Have you managed to conduct further investigation?

agorbachenko avatar Nov 27 '24 06:11 agorbachenko

@travispeloton Could you provide more details about your testing environment? Because I don't clear understand what do you mean by "It usually only happens on one server instance". Also I think this is not only a single query that running on your test suite. Because a reason that a queue limit is exceed can be happened in other places. Am I right? For further investigation it would be nice if you provide example like agorbachenko do in https://github.com/agorbachenko/r2dbc-connection-leak-demo

alexeykurshakov avatar Nov 28 '24 10:11 alexeykurshakov

@agorbachenko unfortunately no. we did a short workaround temporarily in project and moving to jooq instead.

alexeykurshakov avatar Nov 28 '24 10:11 alexeykurshakov

@alexeykurshakov we haven't see the issue again

For "It usually only happens on one server instance", we run multiple k8s pods, so it was observed on a single pod in the 3 different times we saw it.

"not only a single query" - in my case there is one type of query that currently dominates traffic

travispeloton avatar Nov 28 '24 14:11 travispeloton

We encountered a similar problem. Application runned in k8s with 1CPU. When background task (it was runned with Schedulers.boundedElastic()) request from r2dbc repository 1000 entities, it gets only 512 entities, then connection stucked and couldn't return to pool. When i try reproduce issue on local machine (i have 4 core - 8 logical core, i can't. But when i start JVM with setting -XX:ActiveProcessorCount=1, then i could reproduce issue.

I apologize for not being able to present an example.

kuzd4niil avatar May 29 '25 08:05 kuzd4niil

I solved problem by added .collectList() after getting 1000 entities by R2dbcRepository. I think my problem was in starving resource or something like that. And every entity handled in .flatMap before. Now i use .concatMap for evert entity

kuzd4niil avatar May 30 '25 18:05 kuzd4niil