r2dbc-pool icon indicating copy to clipboard operation
r2dbc-pool copied to clipboard

Connection pool hangs after DB restart

Open konsmak opened this issue 3 years ago • 8 comments

Bug Report

Hello. The application I'm working on is unable to obtain a connection from the pool when the DB is restarted while the app is under high load. It's not always reproducible, this problem only happens after the following exception is thrown:

description=Cannot exchange messages because the connection is closed,
cause=io.r2dbc.postgresql.client.ReactorNettyClient$PostgresConnectionClosedException: Cannot exchange messages because the connection is closed

Please see the full stack trace below. Can be a duplicate for the https://github.com/r2dbc/r2dbc-pool/issues/107 which is closed with no fix.

Please let me know if you need more information from me or steps on how to reproduce it. As noticed earlier, this only happens when the application is processing a high number of requests.

Versions

  • Driver: r2dbc-postgres
  • Database: Postgres 13
  • Java: 11
  • OS: Linux

Current Behavior

Stack trace
io.r2dbc.postgresql.client.ReactorNettyClient$PostgresConnectionClosedException: Cannot exchange messages because the connection is closed
	at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.lambda$addConversation$2(ReactorNettyClient.java:797) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
	at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:94) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) ~[reactor-core-3.4.2.jar:3.4.2]
	at io.r2dbc.postgresql.util.FluxDiscardOnCancel.subscribe(FluxDiscardOnCancel.java:49) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
	at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54) ~[reactor-core-3.4.2.jar:3.4.2]
	at io.r2dbc.postgresql.util.FluxDiscardOnCancel.subscribe(FluxDiscardOnCancel.java:49) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
	at reactor.core.publisher.FluxFromMonoOperator.subscribe(FluxFromMonoOperator.java:83) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54) ~[reactor-core-3.4.2.jar:3.4.2]
	at io.r2dbc.postgresql.util.FluxDiscardOnCancel.subscribe(FluxDiscardOnCancel.java:49) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4046) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:173) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4046) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:173) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4046) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1836) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreInner.onError(MonoIgnoreThen.java:243) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:139) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4046) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1836) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onError(MonoIgnoreThen.java:315) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.Operators.error(Operators.java:196) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.MonoError.subscribe(MonoError.java:52) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4046) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:221) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:83) ~[reactor-core-3.4.2.jar:3.4.2]
	at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onError(FluxDiscardOnCancel.java:92) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
	at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onError(FluxDiscardOnCancel.java:92) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onError(FluxHandle.java:202) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.FluxCreate$BaseSink.error(FluxCreate.java:453) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:781) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.error(FluxCreate.java:726) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:230) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:206) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.error(FluxCreate.java:182) ~[reactor-core-3.4.2.jar:3.4.2]
	at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.onError(ReactorNettyClient.java:746) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
	at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.close(ReactorNettyClient.java:1018) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
	at io.r2dbc.postgresql.client.ReactorNettyClient.drainError(ReactorNettyClient.java:518) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
	at io.r2dbc.postgresql.client.ReactorNettyClient.handleClose(ReactorNettyClient.java:506) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
	at io.r2dbc.postgresql.client.ReactorNettyClient.access$200(ReactorNettyClient.java:94) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
	at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onComplete(ReactorNettyClient.java:909) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:212) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:269) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:269) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:457) ~[reactor-netty-core-1.0.3.jar:1.0.3]
	at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:258) ~[reactor-netty-core-1.0.3.jar:1.0.3]
	at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:393) ~[reactor-netty-core-1.0.3.jar:1.0.3]
	at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:396) ~[reactor-netty-core-1.0.3.jar:1.0.3]
	at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:452) ~[reactor-netty-core-1.0.3.jar:1.0.3]
	at reactor.netty.channel.ChannelOperations.onInboundClose(ChannelOperations.java:413) ~[reactor-netty-core-1.0.3.jar:1.0.3]
	at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:74) ~[reactor-netty-core-1.0.3.jar:1.0.3]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:389) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:354) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.r2dbc.postgresql.client.ReactorNettyClient$EnsureSubscribersCompleteChannelHandler.channelInactive(ReactorNettyClient.java:535) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:819) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) ~[netty-transport-native-epoll-4.1.58.Final-linux-x86_64.jar:4.1.58.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
	at java.lang.Thread.run(Thread.java:834) [?:?]

Table schema

Input Code
-- your SQL here;

Steps to reproduce

Input Code
// your code here;

Expected behavior/code

Possible Solution

Additional context

konsmak avatar Oct 21 '21 19:10 konsmak

How do you configure the connection pool? Specifically, do you configure ValidationDepth or a validation query?

mp911de avatar Oct 25 '21 11:10 mp911de

I used the pool with default settings and the number of connections reduced to 1 because it's easier to reproduce the issue that way. Can still reproduce it with 10 connections, which is the default value, but it takes more DB restarts to get into a bad state. It looks like each DB restart causes a few connections to get stale. As suggested, I tried adding the following validation properties, but they didn't make any difference:

validationQuery: "SELECT 1"
validationDepth: REMOTE

I've also noticed that this happens only when I use transactions, disabling them fixes the issue. Transactions are enabled in the following way:

val txOperator = TransactionalOperator.create(txManager)
val flux = /* flux assembly */
flux.`as`(txOperator::transactional)

konsmak avatar Oct 26 '21 17:10 konsmak

Hi. Could you please let me know whether there are any updates on this issue or maybe you could advise what to look at so I could try debugging it myself? Thank you!

konsmak avatar Dec 01 '21 20:12 konsmak

Hi, are there any updates on this issue?

rplowright avatar Feb 15 '22 18:02 rplowright

Right now, I don't have a clue whether the driver has an issue or whether the pool is missing a guard somewhere. Also, restarting a server while a connection is active (and therefore, the connection breaks) should result in an error like PostgresConnectionClosedException. However, after the server is back up again, PostgresConnectionClosedException should go away as the pool should identify that the connection is not valid anymore.

mp911de avatar Feb 16 '22 08:02 mp911de

I also had the same case. However, it was resolved after updating to the latest version. @konsmak Please refer below to update the version and test it.

common : postgreSQL 13.5

before environment

  • io.r2dbc:r2dbc-pool:0.8.8.RELEASE
  • io.r2dbc:r2dbc-postgresql:0.8.11.RELEASE
  • io.r2dbc:r2dbc-spi:0.8.6.RELEASE

current environment (resolved)

  • io.r2dbc:r2dbc-pool:0.9.0.RELEASE
  • org.postgresql:r2dbc-postgresql:0.9.0.RELEASE
  • io.r2dbc:r2dbc-spi:0.9.1.RELEASE

judeKim avatar Feb 16 '22 10:02 judeKim

Likely the pool should intercept errors of these kinds (in my case io.r2dbc.spi.R2dbcNonTransientResourceException) and simply re-establish connection(s). At least I would expect it to behave this way.

I can easily reproduce the error if I run r2dbc connection to PG over an SSH tunnel. Just restart the tunnel, and all current connections are effectively broken yet not closed.

amorozov avatar Jan 25 '24 10:01 amorozov

Hi @mp911de , Thanks for finding the root cause and posting it. I am getting the same issue "org.springframework.dao.DataAccessResourceFailureException: Failed to obtain R2DBC Connection nested exception is io.r2dbc.spi.R2dbcNonTransientResourceException: Connection validation failed" in MSSQL. I am using below libraries(Version). <groupId>io.r2dbc</groupId> <artifactId>r2dbc-pool</artifactId> 1.0.0.RELEASE <groupId>io.r2dbc</groupId> <artifactId>r2dbc-mssql</artifactId> 1.0.0.RELEASE r2dbc-spi-1.0.0.RELEASE.jar Please help.

menakaprabu avatar Feb 15 '24 23:02 menakaprabu