r2dbc-pool
r2dbc-pool copied to clipboard
Connection pool hangs after DB restart
Bug Report
Hello. The application I'm working on is unable to obtain a connection from the pool when the DB is restarted while the app is under high load. It's not always reproducible, this problem only happens after the following exception is thrown:
description=Cannot exchange messages because the connection is closed,
cause=io.r2dbc.postgresql.client.ReactorNettyClient$PostgresConnectionClosedException: Cannot exchange messages because the connection is closed
Please see the full stack trace below. Can be a duplicate for the https://github.com/r2dbc/r2dbc-pool/issues/107 which is closed with no fix.
Please let me know if you need more information from me or steps on how to reproduce it. As noticed earlier, this only happens when the application is processing a high number of requests.
Versions
- Driver: r2dbc-postgres
- Database: Postgres 13
- Java: 11
- OS: Linux
Current Behavior
Stack trace
io.r2dbc.postgresql.client.ReactorNettyClient$PostgresConnectionClosedException: Cannot exchange messages because the connection is closed
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.lambda$addConversation$2(ReactorNettyClient.java:797) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:94) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) ~[reactor-core-3.4.2.jar:3.4.2]
at io.r2dbc.postgresql.util.FluxDiscardOnCancel.subscribe(FluxDiscardOnCancel.java:49) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54) ~[reactor-core-3.4.2.jar:3.4.2]
at io.r2dbc.postgresql.util.FluxDiscardOnCancel.subscribe(FluxDiscardOnCancel.java:49) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
at reactor.core.publisher.FluxFromMonoOperator.subscribe(FluxFromMonoOperator.java:83) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54) ~[reactor-core-3.4.2.jar:3.4.2]
at io.r2dbc.postgresql.util.FluxDiscardOnCancel.subscribe(FluxDiscardOnCancel.java:49) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:4046) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:173) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.Mono.subscribe(Mono.java:4046) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:173) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.Mono.subscribe(Mono.java:4046) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1836) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreInner.onError(MonoIgnoreThen.java:243) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:139) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.Mono.subscribe(Mono.java:4046) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1836) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onError(MonoIgnoreThen.java:315) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.Operators.error(Operators.java:196) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.MonoError.subscribe(MonoError.java:52) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.Mono.subscribe(Mono.java:4046) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:221) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:83) ~[reactor-core-3.4.2.jar:3.4.2]
at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onError(FluxDiscardOnCancel.java:92) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onError(FluxDiscardOnCancel.java:92) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
at reactor.core.publisher.FluxHandle$HandleSubscriber.onError(FluxHandle.java:202) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.FluxCreate$BaseSink.error(FluxCreate.java:453) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:781) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.FluxCreate$BufferAsyncSink.error(FluxCreate.java:726) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:230) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:206) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.FluxCreate$SerializedFluxSink.error(FluxCreate.java:182) ~[reactor-core-3.4.2.jar:3.4.2]
at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.onError(ReactorNettyClient.java:746) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.close(ReactorNettyClient.java:1018) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
at io.r2dbc.postgresql.client.ReactorNettyClient.drainError(ReactorNettyClient.java:518) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
at io.r2dbc.postgresql.client.ReactorNettyClient.handleClose(ReactorNettyClient.java:506) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
at io.r2dbc.postgresql.client.ReactorNettyClient.access$200(ReactorNettyClient.java:94) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onComplete(ReactorNettyClient.java:909) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:212) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:269) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:269) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:457) ~[reactor-netty-core-1.0.3.jar:1.0.3]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:258) ~[reactor-netty-core-1.0.3.jar:1.0.3]
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:393) ~[reactor-netty-core-1.0.3.jar:1.0.3]
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:396) ~[reactor-netty-core-1.0.3.jar:1.0.3]
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:452) ~[reactor-netty-core-1.0.3.jar:1.0.3]
at reactor.netty.channel.ChannelOperations.onInboundClose(ChannelOperations.java:413) ~[reactor-netty-core-1.0.3.jar:1.0.3]
at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:74) ~[reactor-netty-core-1.0.3.jar:1.0.3]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:389) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:354) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.r2dbc.postgresql.client.ReactorNettyClient$EnsureSubscribersCompleteChannelHandler.channelInactive(ReactorNettyClient.java:535) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:819) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) ~[netty-transport-native-epoll-4.1.58.Final-linux-x86_64.jar:4.1.58.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at java.lang.Thread.run(Thread.java:834) [?:?]
Table schema
Input Code
-- your SQL here;
Steps to reproduce
Input Code
// your code here;
Expected behavior/code
Possible Solution
Additional context
How do you configure the connection pool? Specifically, do you configure ValidationDepth
or a validation query?
I used the pool with default settings and the number of connections reduced to 1 because it's easier to reproduce the issue that way. Can still reproduce it with 10 connections, which is the default value, but it takes more DB restarts to get into a bad state. It looks like each DB restart causes a few connections to get stale. As suggested, I tried adding the following validation properties, but they didn't make any difference:
validationQuery: "SELECT 1"
validationDepth: REMOTE
I've also noticed that this happens only when I use transactions, disabling them fixes the issue. Transactions are enabled in the following way:
val txOperator = TransactionalOperator.create(txManager)
val flux = /* flux assembly */
flux.`as`(txOperator::transactional)
Hi. Could you please let me know whether there are any updates on this issue or maybe you could advise what to look at so I could try debugging it myself? Thank you!
Hi, are there any updates on this issue?
Right now, I don't have a clue whether the driver has an issue or whether the pool is missing a guard somewhere. Also, restarting a server while a connection is active (and therefore, the connection breaks) should result in an error like PostgresConnectionClosedException
. However, after the server is back up again, PostgresConnectionClosedException
should go away as the pool should identify that the connection is not valid anymore.
I also had the same case. However, it was resolved after updating to the latest version. @konsmak Please refer below to update the version and test it.
common : postgreSQL 13.5
before environment
- io.r2dbc:r2dbc-pool:0.8.8.RELEASE
- io.r2dbc:r2dbc-postgresql:0.8.11.RELEASE
- io.r2dbc:r2dbc-spi:0.8.6.RELEASE
current environment (resolved)
- io.r2dbc:r2dbc-pool:0.9.0.RELEASE
- org.postgresql:r2dbc-postgresql:0.9.0.RELEASE
- io.r2dbc:r2dbc-spi:0.9.1.RELEASE
Likely the pool should intercept errors of these kinds (in my case io.r2dbc.spi.R2dbcNonTransientResourceException
) and simply re-establish connection(s). At least I would expect it to behave this way.
I can easily reproduce the error if I run r2dbc connection to PG over an SSH tunnel. Just restart the tunnel, and all current connections are effectively broken yet not closed.
Hi @mp911de , Thanks for finding the root cause and posting it. I am getting the same issue "org.springframework.dao.DataAccessResourceFailureException: Failed to obtain R2DBC Connection nested exception is io.r2dbc.spi.R2dbcNonTransientResourceException: Connection validation failed" in MSSQL. I am using below libraries(Version).