r2dbc-mssql icon indicating copy to clipboard operation
r2dbc-mssql copied to clipboard

Random `IllegalReferenceCountException: refCnt: 0`

Open dstepanov opened this issue 3 years ago • 3 comments

We have a randomly failing test on Github Actions, unfortunately, no steps to reproduce.

Versions

  • Driver: Arabba-SR12 - io.r2dbc:r2dbc-mssql:0.8.8.RELEASE
  • Java: 1.8
  • OS: Linux - Github Actions
io.micronaut.data.r2dbc.sqlserver.SqlServerRepositoryPoolSpec test custom delete FAILED

  java.lang.RuntimeException: Async resource cleanup failed after onComplete
      at reactor.core.publisher.FluxUsingWhen$CommitInner.onError(FluxUsingWhen.java:533)
      at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onError(FluxDoFinally.java:136)
      at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
      at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84)
      at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84)
      at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121)
      at io.r2dbc.mssql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onError(FluxDiscardOnCancel.java:97)
      at reactor.core.publisher.FluxHandle$HandleSubscriber.onError(FluxHandle.java:203)
      at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.onError(FluxHandle.java:414)
      at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onError(FluxPeekFuseable.java:903)
      at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onError(FluxPeekFuseable.java:903)
      at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onError(FluxPeekFuseable.java:903)
      at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.onError(FluxHandle.java:414)
      at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onError(MonoFlatMapMany.java:255)
      at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
      at reactor.core.publisher.EmitterProcessor.checkTerminated(EmitterProcessor.java:548)
      at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:415)
      at reactor.core.publisher.EmitterProcessor.subscribe(EmitterProcessor.java:209)
      at reactor.core.publisher.Flux.subscribe(Flux.java:8468)
      at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:195)
      at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:165)
      at io.r2dbc.mssql.client.ReactorNettyClient$ExchangeRequest$1.onSuccess(ReactorNettyClient.java:753)
      at io.r2dbc.mssql.client.ReactorNettyClient$RequestQueue.submit(ReactorNettyClient.java:689)
      at io.r2dbc.mssql.client.ReactorNettyClient$ExchangeRequest.submit(ReactorNettyClient.java:749)
      at io.r2dbc.mssql.client.ReactorNettyClient.lambda$exchange$14(ReactorNettyClient.java:586)
      at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
      at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
      at io.r2dbc.mssql.util.FluxDiscardOnCancel.subscribe(FluxDiscardOnCancel.java:52)
      at reactor.core.publisher.FluxFromMonoOperator.subscribe(FluxFromMonoOperator.java:83)
      at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
      at reactor.core.publisher.Mono.subscribe(Mono.java:4399)
      at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:103)
      at reactor.core.publisher.Flux.subscribe(Flux.java:8468)
      at reactor.core.publisher.FluxUsingWhen$ResourceSubscriber.onNext(FluxUsingWhen.java:202)
      at reactor.core.publisher.FluxRetry$RetrySubscriber.onNext(FluxRetry.java:87)
      at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
      at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
      at io.r2dbc.pool.MonoDiscardOnCancel$MonoDiscardOnCancelSubscriber.onNext(MonoDiscardOnCancel.java:92)
      at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
      at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
      at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
      at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
      at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
      at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
      at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onComplete(FluxHandleFuseable.java:229)
      at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1817)
      at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:62)
      at reactor.core.publisher.Mono.subscribe(Mono.java:4399)
      at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255)
      at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
      at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
      at io.r2dbc.pool.MonoDiscardOnCancel.subscribe(MonoDiscardOnCancel.java:50)
      at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
      at reactor.pool.AbstractPool$Borrower.deliver(AbstractPool.java:464)
      at reactor.pool.SimpleDequePool.lambda$drainLoop$8(SimpleDequePool.java:340)
      at reactor.core.scheduler.ImmediateScheduler.schedule(ImmediateScheduler.java:52)
      at reactor.pool.SimpleDequePool.drainLoop(SimpleDequePool.java:340)
      at reactor.pool.SimpleDequePool.pendingOffer(SimpleDequePool.java:558)
      at reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:268)
      at reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:427)
      at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:110)
      at reactor.pool.SimpleDequePool$QueueBorrowerMono.subscribe(SimpleDequePool.java:676)
      at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
      at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
      at reactor.core.publisher.FluxRetry$RetrySubscriber.resubscribe(FluxRetry.java:117)
      at reactor.core.publisher.MonoRetry.subscribeOrReturn(MonoRetry.java:50)
      at reactor.core.publisher.Mono.subscribe(Mono.java:4384)
      at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:103)
      at reactor.core.publisher.FluxDeferContextual.subscribe(FluxDeferContextual.java:57)
      at reactor.core.publisher.Mono.subscribe(Mono.java:4399)
      at reactor.core.publisher.Mono.blockOptional(Mono.java:1750)
      at io.micronaut.data.operations.reactive.BlockingReactorRepositoryOperations.executeUpdate(BlockingReactorRepositoryOperations.java:92)
      at io.micronaut.data.operations.RepositoryOperations.executeDelete(RepositoryOperations.java:195)
      at io.micronaut.data.runtime.intercept.DefaultDeleteAllInterceptor.intercept(DefaultDeleteAllInterceptor.java:53)
      at io.micronaut.data.runtime.intercept.DefaultDeleteAllInterceptor.intercept(DefaultDeleteAllInterceptor.java:35)
      at io.micronaut.data.intercept.DataIntroductionAdvice.intercept(DataIntroductionAdvice.java:114)
      at io.micronaut.data.intercept.DataIntroductionAdvice.intercept(DataIntroductionAdvice.java:75)
      at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:137)
      at io.micronaut.validation.ValidatingInterceptor.intercept(ValidatingInterceptor.java:138)
      at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:137)
      at io.micronaut.data.tck.tests.AbstractRepositorySpec.cleanupData(AbstractRepositorySpec.groovy:129)
      at io.micronaut.data.tck.tests.AbstractRepositorySpec.cleanup(AbstractRepositorySpec.groovy:141)
  Caused by: io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionException
      at io.r2dbc.mssql.client.ReactorNettyClient.lambda$handleConnectionError$17(ReactorNettyClient.java:619)
      at io.r2dbc.mssql.client.ReactorNettyClient.drainError(ReactorNettyClient.java:629)
      at io.r2dbc.mssql.client.ReactorNettyClient.handleConnectionError(ReactorNettyClient.java:619)
      at io.r2dbc.mssql.client.ReactorNettyClient.resumeError(ReactorNettyClient.java:317)
      at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
      at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:321)
      at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
      at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.request(FluxFlattenIterable.java:274)
      at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158)
      at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:503)
      ... 65 more
  Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0
      at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1454)
      at io.netty.buffer.PooledUnsafeDirectByteBuf.memoryAddress(PooledUnsafeDirectByteBuf.java:241)
      at io.netty.buffer.UnsafeByteBufUtil.setBytes(UnsafeByteBufUtil.java:524)
      at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:193)
      at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1104)
      at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1096)
      at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1087)
      at io.r2dbc.mssql.client.StreamDecoder$DecoderState.initial(StreamDecoder.java:180)
      at io.r2dbc.mssql.client.StreamDecoder.decode(StreamDecoder.java:85)
      at io.r2dbc.mssql.client.StreamDecoder.decode(StreamDecoder.java:64)
      at io.r2dbc.mssql.client.ReactorNettyClient.lambda$new$6(ReactorNettyClient.java:255)
      at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:351)
      ... 69 more

dstepanov avatar Jan 21 '22 05:01 dstepanov

This looks as if the connection was disconnected earlier on and still some activity happens later on. The refCnt: 0 comes from the buffer that was read by Netty for decoding. Do you observe any preceding error messages in the log?

mp911de avatar Jan 21 '22 10:01 mp911de

I haven't seen anything, waiting for another fail to check again

dstepanov avatar Jan 26 '22 06:01 dstepanov

I'm getting a similar but not predictably repeatable error, which might be the same problem:

15:40:00.842 [reactor-tcp-nio-2] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
	io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:402)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
	io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:116)
	io.r2dbc.mssql.message.token.RowToken.doDecodePlp(RowToken.java:259)
	io.r2dbc.mssql.message.token.RowToken.decodeColumnData(RowToken.java:199)
	io.r2dbc.mssql.message.token.NbcRowToken.doDecode(NbcRowToken.java:123)
	io.r2dbc.mssql.message.token.NbcRowToken.decode(NbcRowToken.java:61)
	io.r2dbc.mssql.message.token.Tabular.lambda$decodeFunction$0(Tabular.java:203)
	io.r2dbc.mssql.message.token.Tabular$TabularDecoder.decode(Tabular.java:413)
	io.r2dbc.mssql.client.ConnectionState$4$1.decode(ConnectionState.java:206)
	io.r2dbc.mssql.client.StreamDecoder.withState(StreamDecoder.java:116)
	io.r2dbc.mssql.client.StreamDecoder.decode(StreamDecoder.java:88)
	io.r2dbc.mssql.client.StreamDecoder.decode(StreamDecoder.java:64)
	io.r2dbc.mssql.client.ReactorNettyClient.lambda$new$6(ReactorNettyClient.java:297)
	reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:351)
	reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
	reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:250)
	reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279)
	reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388)
	reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404)
	reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
	io.r2dbc.mssql.client.ssl.TdsSslHandler.channelRead(TdsSslHandler.java:380)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:832)

My stack is: My app > komapper > r2dbc-mssql > SQL Server

The stack trace doesn't touch my code so I can't prove what function is triggering it.

However, I think it is due to reading from a Blob object as when I comment out the Blob reading code it never triggers.

val buffer = ByteBuffer.allocate(65536)
blobObject.stream().collect<ByteBuffer>(buffer::put)

Note: I am caching the result of the above code as Blobs can only be read once. I have no reason to believe that a second thread/co-routine might be trying to read before the result is cached.

From io/r2dbc/spi/Blob.java

    /**
     * Returns the content stream as a {@link Publisher} emitting {@link ByteBuffer} chunks.
     * <p>
     * The content stream can be consumed ("subscribed to") only once.  Subsequent consumptions result in a {@link IllegalStateException}.
     * <p>
     * Once {@link Publisher#subscribe(Subscriber) subscribed}, {@link Subscription#cancel() canceling} the subscription releases resources associated with this {@link Blob}.
     *
     * @return a {@link Publisher} emitting {@link ByteBuffer} chunks.
     */
    Publisher<ByteBuffer> stream();

I suspect that cancel() may not be getting called properly once the Blob is fully read, but I can't figure out how to test this.

donard-commedagh avatar May 25 '22 15:05 donard-commedagh

The issue is still there:

2022-11-14T03:07:02.6207388Z       at io.micronaut.data.r2dbc.operations.DefaultR2dbcRepositoryOperations.blockOptional(DefaultR2dbcRepositoryOperations.java:213)
2022-11-14T03:07:02.6209345Z       at io.micronaut.data.operations.reactive.BlockingExecutorReactorRepositoryOperations.executeDelete(BlockingExecutorReactorRepositoryOperations.java:105)
2022-11-14T03:07:02.6210444Z       at io.micronaut.data.runtime.intercept.DefaultDeleteAllInterceptor.intercept(DefaultDeleteAllInterceptor.java:53)
2022-11-14T03:07:02.6248347Z       at io.micronaut.data.runtime.intercept.DefaultDeleteAllInterceptor.intercept(DefaultDeleteAllInterceptor.java:35)
2022-11-14T03:07:02.6249490Z       at io.micronaut.data.intercept.DataIntroductionAdvice.intercept(DataIntroductionAdvice.java:81)
2022-11-14T03:07:02.6251260Z       at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:137)
2022-11-14T03:07:02.6252087Z       at io.micronaut.validation.ValidatingInterceptor.intercept(ValidatingInterceptor.java:143)
2022-11-14T03:07:02.6261824Z       at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:137)
2022-11-14T03:07:02.6262700Z       at io.micronaut.data.tck.tests.AbstractRepositorySpec.cleanupData(AbstractRepositorySpec.groovy:164)
2022-11-14T03:07:02.6264233Z       at io.micronaut.data.tck.tests.AbstractRepositorySpec.cleanup(AbstractRepositorySpec.groovy:176)
2022-11-14T03:07:02.6265126Z   Caused by: io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionException
2022-11-14T03:07:02.6267089Z       at io.r2dbc.mssql.client.ReactorNettyClient.lambda$handleConnectionError$23(ReactorNettyClient.java:688)
2022-11-14T03:07:02.6267714Z       at io.r2dbc.mssql.client.ReactorNettyClient.drainError(ReactorNettyClient.java:698)
2022-11-14T03:07:02.6268345Z       at io.r2dbc.mssql.client.ReactorNettyClient.handleConnectionError(ReactorNettyClient.java:688)
2022-11-14T03:07:02.6268952Z       at io.r2dbc.mssql.client.ReactorNettyClient.resumeError(ReactorNettyClient.java:366)
2022-11-14T03:07:02.6269536Z       at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
2022-11-14T03:07:02.6270209Z       at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:321)
2022-11-14T03:07:02.6270891Z       at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
2022-11-14T03:07:02.6271552Z       at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.request(FluxFlattenIterable.java:274)
2022-11-14T03:07:02.6272189Z       at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158)
2022-11-14T03:07:02.6272761Z       at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:549)
2022-11-14T03:07:02.6273151Z       ... 69 more
2022-11-14T03:07:02.6283901Z   Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0
2022-11-14T03:07:02.6284470Z       at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1454)
2022-11-14T03:07:02.6285086Z       at io.netty.buffer.PooledUnsafeDirectByteBuf.memoryAddress(PooledUnsafeDirectByteBuf.java:241)
2022-11-14T03:07:02.6285657Z       at io.netty.buffer.UnsafeByteBufUtil.setBytes(UnsafeByteBufUtil.java:524)
2022-11-14T03:07:02.6286219Z       at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:193)
2022-11-14T03:07:02.6286761Z       at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1104)
2022-11-14T03:07:02.6287242Z       at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1096)
2022-11-14T03:07:02.6287712Z       at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1087)
2022-11-14T03:07:02.6288225Z       at io.r2dbc.mssql.client.StreamDecoder$DecoderState.initial(StreamDecoder.java:180)
2022-11-14T03:07:02.6288736Z       at io.r2dbc.mssql.client.StreamDecoder.decode(StreamDecoder.java:85)
2022-11-14T03:07:02.6289228Z       at io.r2dbc.mssql.client.StreamDecoder.decode(StreamDecoder.java:64)
2022-11-14T03:07:02.6289746Z       at io.r2dbc.mssql.client.ReactorNettyClient.lambda$new$6(ReactorNettyClient.java:297)
2022-11-14T03:07:02.6290366Z       at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:351)
2022-11-14T03:07:02.6290818Z       ... 73 more

dstepanov avatar Nov 14 '22 05:11 dstepanov

Hi everybody!

Reporting that we are facing possibly the same problem and also the memory leak notification from Netty leak detector.

In our use case we are using r2dbc, r2dbc-pool, Reactor, Spring and we are connecting to MS SQL Server.

Leak nag:

[2023-04-25T10:57:48.525Z] huhtik. 25, 2023 1:57:48 IP. io.netty.util.ResourceLeakDetector reportTracedLeak
[2023-04-25T10:57:48.534Z] SEVERE: LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.

Stack trace regarding refCnt:

Exception: IllegalReferenceCountException: refCnt: 0
Stack: java.lang.reflect.InvocationTargetException
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at com.microsoft.azure.functions.worker.broker.JavaMethodInvokeInfo.invoke(JavaMethodInvokeInfo.java:22)
	at com.microsoft.azure.functions.worker.broker.EnhancedJavaMethodExecutorImpl.execute(EnhancedJavaMethodExecutorImpl.java:22)
	at com.microsoft.azure.functions.worker.chain.FunctionExecutionMiddleware.invoke(FunctionExecutionMiddleware.java:19)
	at com.microsoft.azure.functions.worker.chain.InvocationChain.doNext(InvocationChain.java:21)
	at com.microsoft.azure.functions.worker.broker.JavaFunctionBroker.invokeMethod(JavaFunctionBroker.java:125)
	at com.microsoft.azure.functions.worker.handler.InvocationRequestHandler.execute(InvocationRequestHandler.java:34)
	at com.microsoft.azure.functions.worker.handler.InvocationRequestHandler.execute(InvocationRequestHandler.java:10)
	at com.microsoft.azure.functions.worker.handler.MessageHandler.handle(MessageHandler.java:44)
	at com.microsoft.azure.functions.worker.JavaWorkerClient$StreamingMessagePeer.lambda$onNext$0(JavaWorkerClient.java:94)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.dao.DataAccessResourceFailureException: executeMany; SQL [< secrets >]; null
	at org.springframework.r2dbc.connection.ConnectionFactoryUtils.convertR2dbcException(ConnectionFactoryUtils.java:231)
	Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below:
Assembly trace from producer [reactor.core.publisher.FluxOnErrorResume] :
	reactor.core.publisher.Flux.onErrorMap
	org.springframework.r2dbc.core.DefaultDatabaseClient.inConnectionMany(DefaultDatabaseClient.java:149)
Error has been observed at the following site(s):
	*_____Flux.onErrorMap ⇢ at org.springframework.r2dbc.core.DefaultDatabaseClient.inConnectionMany(DefaultDatabaseClient.java:149)
	|_                    ⇢ at org.springframework.r2dbc.core.DefaultFetchSpec.all(DefaultFetchSpec.java:85)
	|_          Flux.name ⇢ at < secrets >
	|_           Flux.tag ⇢ at
	|_       Flux.metrics ⇢ at
	|_           Flux.map ⇢ at
	*_________Flux.concat ⇢ at
	|_          Flux.name ⇢ at
	|_           Flux.tag ⇢ at
	|_       Flux.metrics ⇢ at
	|_       Flux.flatMap ⇢ at
	|_       Flux.flatMap ⇢ at
	|_     Flux.publishOn ⇢ at
	|_       Flux.flatMap ⇢ at
	|_       Flux.flatMap ⇢ at
	|_           Flux.map ⇢ at
	|_ Flux.switchIfEmpty ⇢ at
	*________Flux.flatMap ⇢ at
	|_ Flux.switchIfEmpty ⇢ at
	*________Flux.flatMap ⇢ at
	|_        Flux.filter ⇢ at
	|_          Flux.sort ⇢ at
	|_          Flux.skip ⇢ at
	|_          Flux.take ⇢ at
	|_                    ⇢ at
	|_     Flux.doOnError ⇢ at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeFunction(SimpleFunctionRegistry.java:834)
	|_          Flux.from ⇢ at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertOutputPublisherIfNecessary(SimpleFunctionRegistry.java:1415)
	|_           Flux.map ⇢ at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertOutputPublisherIfNecessary(SimpleFunctionRegistry.java:1415)
Original Stack Trace:
		at org.springframework.r2dbc.connection.ConnectionFactoryUtils.convertR2dbcException(ConnectionFactoryUtils.java:231)
		at org.springframework.r2dbc.core.DefaultDatabaseClient.lambda$inConnectionMany$8(DefaultDatabaseClient.java:150)
		at reactor.core.publisher.Flux.lambda$onErrorMap$28(Flux.java:7123)
		at reactor.core.publisher.Flux.lambda$onErrorResume$29(Flux.java:7176)
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
		at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredError(FluxUsingWhen.java:398)
		at reactor.core.publisher.FluxUsingWhen$RollbackInner.onComplete(FluxUsingWhen.java:475)
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205)
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onComplete(FluxPeekFuseable.java:277)
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205)
		at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128)
		at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128)
		at reactor.core.publisher.Operators.complete(Operators.java:137)
		at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278)
		at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:200)
		at reactor.core.publisher.MonoSupplier$MonoSupplierSubscription.request(MonoSupplier.java:145)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:259)
		at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onSubscribe(MonoIgnoreElements.java:72)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onSubscribe(FluxHandleFuseable.java:163)
		at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:48)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
		at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
		at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142)
		at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onError(FluxFilterFuseable.java:162)
		at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onError(FluxFilterFuseable.java:382)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onError(FluxMapFuseable.java:340)
		at reactor.core.publisher.Operators.error(Operators.java:198)
		at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
		at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
		at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onError(FluxUsingWhen.java:364)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:842)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:608)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347)
		at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319)
		at reactor.core.publisher.Operators$DeferredSubscription.request(Operators.java:1717)
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2305)
		at reactor.core.publisher.FluxMetrics$MetricsSubscriber.request(FluxMetrics.java:204)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
		at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.request(FluxConcatArray.java:276)
		at reactor.core.publisher.FluxMetrics$MetricsSubscriber.request(FluxMetrics.java:204)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:790)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:790)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.innerComplete(FluxFlatMap.java:894)
		at reactor.core.publisher.FluxFlatMap$FlatMapInner.onComplete(FluxFlatMap.java:997)
		at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:85)
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:159)
		at reactor.core.publisher.MonoMetricsFuseable$MetricsFuseableSubscriber.onNext(MonoMetricsFuseable.java:130)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
		at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:488)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:421)
		at reactor.core.publisher.FluxBuffer$BufferExactSubscriber.onComplete(FluxBuffer.java:185)
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205)
		at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredComplete(FluxUsingWhen.java:392)
		at reactor.core.publisher.FluxUsingWhen$CommitInner.onComplete(FluxUsingWhen.java:527)
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205)
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onComplete(FluxPeekFuseable.java:277)
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209)
		at reactor.pool.SimpleDequePool.maybeRecycleAndDrain(SimpleDequePool.java:531)
		at reactor.pool.SimpleDequePool$QueuePoolRecyclerInner.onComplete(SimpleDequePool.java:761)
		at reactor.core.publisher.Operators.complete(Operators.java:137)
		at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
		at reactor.pool.SimpleDequePool$QueuePoolRecyclerMono.subscribe(SimpleDequePool.java:873)
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
		at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
		at reactor.core.publisher.Operators.complete(Operators.java:137)
		at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
		at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
		at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onComplete(FluxHandleFuseable.java:238)
		at reactor.core.publisher.MonoSupplier$MonoSupplierSubscription.request(MonoSupplier.java:148)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:259)
		at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onSubscribe(MonoIgnoreElements.java:72)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onSubscribe(FluxHandleFuseable.java:163)
		at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:48)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
		at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
		at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142)
		at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onError(FluxFilterFuseable.java:162)
		at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onError(FluxFilterFuseable.java:382)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onError(FluxMapFuseable.java:340)
		at reactor.core.publisher.Operators.error(Operators.java:198)
		at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
		at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
		at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onComplete(FluxUsingWhen.java:384)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:846)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:608)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:465)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:152)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:152)
		at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.checkTerminated(FluxWindowPredicate.java:540)
		at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.drainLoop(FluxWindowPredicate.java:488)
		at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.drain(FluxWindowPredicate.java:432)
		at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.onComplete(FluxWindowPredicate.java:312)
		at reactor.core.publisher.SerializedSubscriber.onComplete(SerializedSubscriber.java:146)
		at reactor.core.publisher.FluxTakeUntilOther$TakeUntilMainSubscriber.cancelMainAndComplete(FluxTakeUntilOther.java:200)
		at reactor.core.publisher.FluxTakeUntilOther$TakeUntilOtherSubscriber.onComplete(FluxTakeUntilOther.java:128)
		at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:272)
		at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:86)
		at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46)
		at io.r2dbc.mssql.RpcQueryMessageFlow$OnCursorComplete.run(RpcQueryMessageFlow.java:686)
		at io.r2dbc.mssql.RpcQueryMessageFlow.onDone(RpcQueryMessageFlow.java:387)
		at io.r2dbc.mssql.RpcQueryMessageFlow.handleMessage(RpcQueryMessageFlow.java:377)
		at io.r2dbc.mssql.RpcQueryMessageFlow.handleMessage(RpcQueryMessageFlow.java:315)
		at io.r2dbc.mssql.RpcQueryMessageFlow.lambda$exchange$1(RpcQueryMessageFlow.java:141)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableConditionalSubscriber.onNext(FluxHandleFuseable.java:488)
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onNext(FluxPeekFuseable.java:503)
		at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onNext(FluxPeekFuseable.java:503)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableConditionalSubscriber.onNext(FluxHandleFuseable.java:504)
		at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
		at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
		at reactor.core.publisher.SinkManyEmitterProcessor.drain(SinkManyEmitterProcessor.java:471)
		at reactor.core.publisher.SinkManyEmitterProcessor.tryEmitNext(SinkManyEmitterProcessor.java:269)
		at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
		at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
		at io.r2dbc.mssql.client.ReactorNettyClient$1.next(ReactorNettyClient.java:295)
		at io.r2dbc.mssql.client.ReactorNettyClient$1.next(ReactorNettyClient.java:214)
		at io.r2dbc.mssql.client.ReactorNettyClient$2.onNext(ReactorNettyClient.java:326)
		at io.r2dbc.mssql.client.ReactorNettyClient$2.onNext(ReactorNettyClient.java:315)
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
		at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:453)
		at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:724)
		at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:256)
		at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:292)
		at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:401)
		at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:411)
		at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:113)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
		at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
		at io.r2dbc.mssql.client.ssl.TdsSslHandler.channelRead(TdsSslHandler.java:380)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
		at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
		at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
		at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
		at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
		at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
		at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
		at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionException
	at io.r2dbc.mssql.client.ReactorNettyClient.lambda$handleConnectionError$24(ReactorNettyClient.java:692)
	Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below:
Assembly trace from producer [reactor.core.publisher.FluxPeek] :
	reactor.core.publisher.Flux.doOnSubscribe
	io.r2dbc.mssql.client.ReactorNettyClient.lambda$exchange$18(ReactorNettyClient.java:628)
Error has been observed at the following site(s):
	*_____Flux.doOnSubscribe ⇢ at io.r2dbc.mssql.client.ReactorNettyClient.lambda$exchange$18(ReactorNettyClient.java:628)
	*_______Mono.flatMapMany ⇢ at io.r2dbc.mssql.client.ReactorNettyClient.exchange(ReactorNettyClient.java:652)
	|_           Flux.handle ⇢ at io.r2dbc.mssql.client.ReactorNettyClient.exchange(ReactorNettyClient.java:652)
	|_ Flux.doAfterTerminate ⇢ at io.r2dbc.mssql.client.ReactorNettyClient.exchange(ReactorNettyClient.java:662)
	|_        Flux.doFinally ⇢ at io.r2dbc.mssql.client.ReactorNettyClient.exchange(ReactorNettyClient.java:662)
	|_       Flux.doOnCancel ⇢ at io.r2dbc.mssql.client.ReactorNettyClient.exchange(ReactorNettyClient.java:662)
	|_           Flux.handle ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.exchange(RpcQueryMessageFlow.java:137)
	|_           Flux.filter ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.exchange(RpcQueryMessageFlow.java:143)
	|_       Flux.doOnCancel ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.exchange(RpcQueryMessageFlow.java:144)
	|_    Flux.doOnSubscribe ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.exchange(RpcQueryMessageFlow.java:146)
	|_                       ⇢ at io.r2dbc.mssql.util.Operators.discardOnCancel(Operators.java:60)
	|_      Flux.doOnDiscard ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.lambda$exchange$3(RpcQueryMessageFlow.java:149)
	|_        Flux.transform ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.exchange(RpcQueryMessageFlow.java:149)
	|_   Flux.takeUntilOther ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.exchange(RpcQueryMessageFlow.java:149)
	|_                       ⇢ at io.r2dbc.mssql.MssqlStatementSupport.potentiallyAttachTimeout(MssqlStatementSupport.java:100)
	|_      Flux.windowUntil ⇢ at io.r2dbc.mssql.ParametrizedMssqlStatement.lambda$execute$8(ParametrizedMssqlStatement.java:139)
	|_              Flux.map ⇢ at io.r2dbc.mssql.ParametrizedMssqlStatement.lambda$execute$8(ParametrizedMssqlStatement.java:139)
	*_____________Flux.defer ⇢ at io.r2dbc.mssql.ParametrizedMssqlStatement.execute(ParametrizedMssqlStatement.java:119)
	|_             Flux.from ⇢ at org.springframework.r2dbc.core.DefaultDatabaseClient$DefaultGenericExecuteSpec.lambda$getResultFunction$6(DefaultDatabaseClient.java:398)
	|_             Flux.cast ⇢ at org.springframework.r2dbc.core.DefaultDatabaseClient$DefaultGenericExecuteSpec.lambda$getResultFunction$6(DefaultDatabaseClient.java:399)
	|_            checkpoint ⇢ SQL  < secrets > [DatabaseClient]
	|_          Flux.flatMap ⇢ at org.springframework.r2dbc.core.DefaultFetchSpec.lambda$all$1(DefaultFetchSpec.java:87)
	*_________Flux.usingWhen ⇢ at org.springframework.r2dbc.core.DefaultDatabaseClient.inConnectionMany(DefaultDatabaseClient.java:137)
Original Stack Trace:
		at io.r2dbc.mssql.client.ReactorNettyClient.lambda$handleConnectionError$24(ReactorNettyClient.java:692)
		at io.r2dbc.mssql.client.ReactorNettyClient.drainError(ReactorNettyClient.java:702)
		at io.r2dbc.mssql.client.ReactorNettyClient.handleConnectionError(ReactorNettyClient.java:692)
		at io.r2dbc.mssql.client.ReactorNettyClient.resumeError(ReactorNettyClient.java:370)
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
		at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:351)
		at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:724)
		at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.request(FluxFlattenIterable.java:303)
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2305)
		at reactor.core.publisher.SinkManyEmitterProcessor.drain(SinkManyEmitterProcessor.java:483)
		at reactor.core.publisher.SinkManyEmitterProcessor$EmitterInner.drainParent(SinkManyEmitterProcessor.java:615)
		at reactor.core.publisher.FluxPublish$PubSubInner.request(FluxPublish.java:602)
		at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138)
		at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.request(MonoFlatMapMany.java:112)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableConditionalSubscriber.request(FluxHandleFuseable.java:653)
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.request(FluxPeekFuseable.java:437)
		at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.request(FluxDoFinally.java:140)
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.request(FluxPeekFuseable.java:437)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableConditionalSubscriber.request(FluxHandleFuseable.java:653)
		at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:191)
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
		at io.r2dbc.mssql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.request(FluxDiscardOnCancel.java:115)
		at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
		at reactor.core.publisher.FluxTakeUntilOther$TakeUntilMainSubscriber.request(FluxTakeUntilOther.java:182)
		at reactor.core.publisher.SerializedSubscriber.request(SerializedSubscriber.java:151)
		at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drainRegular(FluxWindowPredicate.java:684)
		at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drain(FluxWindowPredicate.java:748)
		at reactor.core.publisher.FluxWindowPredicate$WindowFlux.request(FluxWindowPredicate.java:835)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:259)
		at reactor.core.publisher.FluxFlatMap$FlatMapInner.request(FluxFlatMap.java:1008)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:729)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347)
		at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319)
		at reactor.core.publisher.Operators$DeferredSubscription.request(Operators.java:1717)
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2305)
		at reactor.core.publisher.FluxMetrics$MetricsSubscriber.request(FluxMetrics.java:204)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
		at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.request(FluxConcatArray.java:276)
		at reactor.core.publisher.FluxMetrics$MetricsSubscriber.request(FluxMetrics.java:204)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:790)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:790)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.innerComplete(FluxFlatMap.java:894)
		at reactor.core.publisher.FluxFlatMap$FlatMapInner.onComplete(FluxFlatMap.java:997)
		at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:85)
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:159)
		at reactor.core.publisher.MonoMetricsFuseable$MetricsFuseableSubscriber.onNext(MonoMetricsFuseable.java:130)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
		at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
		at reactor.core.publisher < clipped by Azure ... >

aironi avatar Apr 26 '23 09:04 aironi

Hi all!

I believe this problem would be corrected by moving handleConnectionError(throwable) from linre 370 to 381:

https://github.com/r2dbc/r2dbc-mssql/blob/main/src/main/java/io/r2dbc/mssql/client/ReactorNettyClient.java#L370

Please confirm.

aironi avatar Nov 20 '23 11:11 aironi

Changing order will complete the request queue first and then discard the pending response handlers. It will guarantee that no new request will be accepted and I think, that is a useful change. I'm not sure whether it will fix the issue though.

Snapshot builds are available now, please retest.

mp911de avatar Nov 20 '23 12:11 mp911de

Changing order will complete the request queue first and then discard the pending response handlers. It will guarantee that no new request will be accepted and I think, that is a useful change. I'm not sure whether it will fix the issue though.

Snapshot builds are available now, please retest.

Thanks, I did test this with my custom snapshot that I built locally and the refCnt disappeared. Instead I got the "connection closed" stack trace. We are not still sure why connection gets closed. We do perform a lot of queries to SQL Server and have only one troublesome use case where a lot of data is fetched with multiple complex queries. But that seems to be a different problem and this bug only hid the actual stack trace. Anyways, since I am not familiar with this package, I was not completely sure if this was a proper fix or not, hence the comment instead of PR :slightly_smiling_face:

I will test with your snapshot as soon as possible! Thanks again!

aironi avatar Nov 22 '23 06:11 aironi

Instead I got the "connection closed" stack trace.

I think this is actually an improvement. Connection closed can happen when there is too much pressure on the connection (i.e. server TCP recv buffers exhausted). Can you check how much activity is going on, especially parallel activity? Switching to concatMap instead of using flatMap or large prefetch sizes should get you there.

Once I get the confirmation from your side that the fix is appropriate, I'll close the ticket here.

mp911de avatar Nov 22 '23 08:11 mp911de

Bad news: I attempted my use case with 1.1.0-BUILD-SNAPSHOT but it seems that I am hitting #276 and to my old eyes it seems that the queries get mixed up between connections like mentioned in #273 . I am using a pool.

Good news: I could not see the refCnt problem with this version.

For me, the error is the following (real parameter names renamed):

[2023-11-22T17:54:19.789Z] Exception: ExceptionFactory.MssqlNonTransientException: The parameterized query '(@P0_actualId nvarchar(4000),@P1_anotherId nvarchar(4000))SELECT ' expects the parameter '@P1_anotherId', which was not supplied.
...

[2023-11-22T17:54:19.790Z] Caused by: org.springframework.r2dbc.UncategorizedR2dbcException: executeMany; SQL [SELECT TOP(1) * FROM TABLE WITH(NOLOCK) WHERE ActualId = (:actualId)]; The parameterized query '(@P0_actualId nvarchar(4000),@P1_anotherId nvarchar(4000))SELECT ' expects the parameter '@P1_anotherId', which was not supplied.
[2023-11-22T17:54:19.790Z] 	at org.springframework.r2dbc.connection.ConnectionFactoryUtils.convertR2dbcException(ConnectionFactoryUtils.java:245)
[2023-11-22T17:54:19.790Z] 	Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below: 
[2023-11-22T17:54:19.791Z] Assembly trace from producer [reactor.core.publisher.FluxOnErrorResume] :
[2023-11-22T17:54:19.791Z] 	reactor.core.publisher.Flux.onErrorMap
[2023-11-22T17:54:19.791Z] 	org.springframework.r2dbc.core.DefaultDatabaseClient.inConnectionMany(DefaultDatabaseClient.java:151)
[2023-11-22T17:54:19.791Z] Error has been observed at the following site(s):
[2023-11-22T17:54:19.791Z] 	*_____Flux.onErrorMap ⇢ at org.springframework.r2dbc.core.DefaultDatabaseClient.inConnectionMany(DefaultDatabaseClient.java:151)
[2023-11-22T17:54:19.791Z] 	|_                    ⇢ at org.springframework.r2dbc.core.DefaultFetchSpec.all(DefaultFetchSpec.java:83)
[2023-11-22T17:54:19.791Z] 	|_        Flux.buffer ⇢ at org.springframework.r2dbc.core.DefaultFetchSpec.one(DefaultFetchSpec.java:62)
[2023-11-22T17:54:19.791Z] 	|_       Flux.flatMap ⇢ at org.springframework.r2dbc.core.DefaultFetchSpec.one(DefaultFetchSpec.java:63)
[2023-11-22T17:54:19.792Z] 	|_          Flux.next ⇢ at org.springframework.r2dbc.core.DefaultFetchSpec.one(DefaultFetchSpec.java:73)
[2023-11-22T17:54:19.792Z] 	|_          Mono.name ⇢ at com.mycode.MyRepositoryImpl.findXyzSql(MyRepositoryImpl.java:123)

BUT, the query in question does not even specify the anotherId in SQL.

I will comment this also to #276.

We are using 1.0.0.RELEASE in our released solution at the moment.

aironi avatar Nov 22 '23 18:11 aironi

Thanks a lot for confirming. Closing this one as done.

The other issues are different ones that we need to sort out.

mp911de avatar Nov 23 '23 08:11 mp911de