r2dbc-mssql
r2dbc-mssql copied to clipboard
Parameterized Statements throws exception while using Kotlin Flow
Bug Report
Versions
- Driver: r2dbc-mssql-0.8.8.RELEASE
- Database: Microsoft SQL Server 2019 (RTM-CU15) (KB5008996) - 15.0.4198.2 (X64)
- Java: temurin-11.0.15
- Kotlin: 1.6.21
- Spring Boot: 2.6.7
- OS: Linux 5.17.5-arch1-1
Current Behavior
While collecting from the following flow, it randomly closes connection and cancels the flow. It happens when i try fetch too many rows. In my case that is 34k rows. Also, rarely it collects all rows without error.
fun findAllByIdBetween(fromId: Long, toId: Long): Flow<LegacyCustomer>
Stack trace
org.springframework.dao.DataAccessResourceFailureException: executeMany; SQL [SELECT Customers.Id, Customers.MobilePhone, Customers.FirstName, Customers.LastName, Customers.DMSId, Customers.BirthDate, Customers.Email, Customers.Gender, Customers.OccupationId, Customers.SegmentId, Customers.LFCustomerId, Customers.CreateDate, Customers.UpdateDate FROM Customers WHERE Customers.Id BETWEEN @P0_Id AND @P1_Id]; null; nested exception is io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionException
at org.springframework.r2dbc.connection.ConnectionFactoryUtils.convertR2dbcException(ConnectionFactoryUtils.java:226) ~[spring-r2dbc-5.3.19.jar:5.3.19]
at org.springframework.r2dbc.core.DefaultDatabaseClient.lambda$inConnectionMany$8(DefaultDatabaseClient.java:147) ~[spring-r2dbc-5.3.19.jar:5.3.19]
at reactor.core.publisher.Flux.lambda$onErrorMap$29(Flux.java:6946) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.Flux.lambda$onErrorResume$30(Flux.java:6999) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredError(FluxUsingWhen.java:398) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxUsingWhen$RollbackInner.onComplete(FluxUsingWhen.java:475) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:145) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:145) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.Operators.complete(Operators.java:137) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.Mono.subscribe(Mono.java:4400) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:198) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:62) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.Mono.subscribe(Mono.java:4400) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.Mono.subscribe(Mono.java:4400) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:132) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFilter$FilterSubscriber.onError(FluxFilter.java:157) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFilter$FilterConditionalSubscriber.onError(FluxFilter.java:291) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onError(FluxMap.java:259) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.Operators.error(Operators.java:198) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.MonoError.subscribe(MonoError.java:53) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.Mono.subscribe(Mono.java:4400) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onError(FluxUsingWhen.java:364) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:842) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:608) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.Operators$DeferredSubscription.request(Operators.java:1680) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:790) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319) ~[reactor-core-3.4.17.jar:3.4.17]
at org.springframework.cloud.sleuth.instrument.reactor.SleuthContextOperator.request(ReactorSleuth.java:658) ~[spring-cloud-sleuth-instrumentation-3.1.1.jar:3.1.1]
at reactor.core.publisher.StrictSubscriber.request(StrictSubscriber.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
at kotlinx.coroutines.reactive.ReactiveSubscriber.makeRequest(ReactiveFlow.kt:153) ~[kotlinx-coroutines-reactive-1.5.2.jar:na]
at kotlinx.coroutines.reactive.PublisherAsFlow.collectImpl(ReactiveFlow.kt:103) ~[kotlinx-coroutines-reactive-1.5.2.jar:na]
at kotlinx.coroutines.reactive.PublisherAsFlow.access$collectImpl(ReactiveFlow.kt:48) ~[kotlinx-coroutines-reactive-1.5.2.jar:na]
at kotlinx.coroutines.reactive.PublisherAsFlow$collectImpl$1.invokeSuspend(ReactiveFlow.kt) ~[kotlinx-coroutines-reactive-1.5.2.jar:na]
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) ~[kotlin-stdlib-1.6.21.jar:1.6.21-release-334(1.6.21)]
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106) ~[kotlinx-coroutines-core-jvm-1.5.2.jar:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionException: null
at io.r2dbc.mssql.client.ReactorNettyClient.lambda$handleConnectionError$17(ReactorNettyClient.java:619) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ SQL "SELECT Customers.Id, Customers.MobilePhone, Customers.FirstName, Customers.LastName, Customers.DMSId, Customers.BirthDate, Customers.Email, Customers.Gender, Customers.OccupationId, Customers.SegmentId, Customers.LFCustomerId, Customers.CreateDate, Customers.UpdateDate FROM Customers WHERE Customers.Id BETWEEN @P0_Id AND @P1_Id" [DatabaseClient]
Original Stack Trace:
at io.r2dbc.mssql.client.ReactorNettyClient.lambda$handleConnectionError$17(ReactorNettyClient.java:619) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
at io.r2dbc.mssql.client.ReactorNettyClient.drainError(ReactorNettyClient.java:629) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
at io.r2dbc.mssql.client.ReactorNettyClient.handleConnectionError(ReactorNettyClient.java:619) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
at io.r2dbc.mssql.client.ReactorNettyClient.resumeError(ReactorNettyClient.java:317) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:321) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.request(FluxFlattenIterable.java:274) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:503) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.EmitterProcessor$EmitterInner.drainParent(EmitterProcessor.java:640) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxPublish$PubSubInner.request(FluxPublish.java:585) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
at org.springframework.cloud.sleuth.instrument.reactor.SleuthContextOperator.request(ReactorSleuth.java:658) ~[spring-cloud-sleuth-instrumentation-3.1.1.jar:3.1.1]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.request(MonoFlatMapMany.java:112) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxHandle$HandleSubscriber.request(FluxHandle.java:269) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:503) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.EmitterProcessor$EmitterInner.drainParent(EmitterProcessor.java:640) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxPublish$PubSubInner.request(FluxPublish.java:585) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.request(FluxHandle.java:475) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:186) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
at io.r2dbc.mssql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.request(FluxDiscardOnCancel.java:110) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drainRegular(FluxWindowPredicate.java:682) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drain(FluxWindowPredicate.java:746) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxWindowPredicate$WindowFlux.request(FluxWindowPredicate.java:833) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:257) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlatMap$FlatMapInner.request(FluxFlatMap.java:1008) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:729) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.Operators$DeferredSubscription.request(Operators.java:1680) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:790) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319) ~[reactor-core-3.4.17.jar:3.4.17]
at org.springframework.cloud.sleuth.instrument.reactor.SleuthContextOperator.request(ReactorSleuth.java:658) ~[spring-cloud-sleuth-instrumentation-3.1.1.jar:3.1.1]
at reactor.core.publisher.StrictSubscriber.request(StrictSubscriber.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
at kotlinx.coroutines.reactive.ReactiveSubscriber.makeRequest(ReactiveFlow.kt:153) ~[kotlinx-coroutines-reactive-1.5.2.jar:na]
at kotlinx.coroutines.reactive.PublisherAsFlow.collectImpl(ReactiveFlow.kt:103) ~[kotlinx-coroutines-reactive-1.5.2.jar:na]
at kotlinx.coroutines.reactive.PublisherAsFlow.access$collectImpl(ReactiveFlow.kt:48) ~[kotlinx-coroutines-reactive-1.5.2.jar:na]
at kotlinx.coroutines.reactive.PublisherAsFlow$collectImpl$1.invokeSuspend(ReactiveFlow.kt) ~[kotlinx-coroutines-reactive-1.5.2.jar:na]
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) ~[kotlin-stdlib-1.6.21.jar:1.6.21-release-334(1.6.21)]
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106) ~[kotlinx-coroutines-core-jvm-1.5.2.jar:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: java.lang.RuntimeException: java.lang.IllegalArgumentException: Invalid header type: 0x0
at io.r2dbc.mssql.client.StreamDecoder$ListSink.error(StreamDecoder.java:350) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
at io.r2dbc.mssql.client.StreamDecoder.withState(StreamDecoder.java:135) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
at io.r2dbc.mssql.client.StreamDecoder.decode(StreamDecoder.java:88) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
at io.r2dbc.mssql.client.StreamDecoder.decode(StreamDecoder.java:64) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
at io.r2dbc.mssql.client.ReactorNettyClient.lambda$new$6(ReactorNettyClient.java:255) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:351) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.request(FluxFlattenIterable.java:274) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:503) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.EmitterProcessor$EmitterInner.drainParent(EmitterProcessor.java:640) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxPublish$PubSubInner.request(FluxPublish.java:585) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
at org.springframework.cloud.sleuth.instrument.reactor.SleuthContextOperator.request(ReactorSleuth.java:658) ~[spring-cloud-sleuth-instrumentation-3.1.1.jar:3.1.1]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.request(MonoFlatMapMany.java:112) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxHandle$HandleSubscriber.request(FluxHandle.java:269) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:503) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.EmitterProcessor$EmitterInner.drainParent(EmitterProcessor.java:640) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxPublish$PubSubInner.request(FluxPublish.java:585) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.request(FluxHandle.java:475) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:186) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
at io.r2dbc.mssql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.request(FluxDiscardOnCancel.java:110) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drainRegular(FluxWindowPredicate.java:682) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drain(FluxWindowPredicate.java:746) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxWindowPredicate$WindowFlux.request(FluxWindowPredicate.java:833) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:257) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlatMap$FlatMapInner.request(FluxFlatMap.java:1008) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:729) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.Operators$DeferredSubscription.request(Operators.java:1680) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:790) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347) ~[reactor-core-3.4.17.jar:3.4.17]
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319) ~[reactor-core-3.4.17.jar:3.4.17]
at org.springframework.cloud.sleuth.instrument.reactor.SleuthContextOperator.request(ReactorSleuth.java:658) ~[spring-cloud-sleuth-instrumentation-3.1.1.jar:3.1.1]
at reactor.core.publisher.StrictSubscriber.request(StrictSubscriber.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
at kotlinx.coroutines.reactive.ReactiveSubscriber.makeRequest(ReactiveFlow.kt:153) ~[kotlinx-coroutines-reactive-1.5.2.jar:na]
at kotlinx.coroutines.reactive.PublisherAsFlow.collectImpl(ReactiveFlow.kt:103) ~[kotlinx-coroutines-reactive-1.5.2.jar:na]
at kotlinx.coroutines.reactive.PublisherAsFlow.access$collectImpl(ReactiveFlow.kt:48) ~[kotlinx-coroutines-reactive-1.5.2.jar:na]
at kotlinx.coroutines.reactive.PublisherAsFlow$collectImpl$1.invokeSuspend(ReactiveFlow.kt) ~[kotlinx-coroutines-reactive-1.5.2.jar:na]
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) ~[kotlin-stdlib-1.6.21.jar:1.6.21-release-334(1.6.21)]
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106) ~[kotlinx-coroutines-core-jvm-1.5.2.jar:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: java.lang.IllegalArgumentException: Invalid header type: 0x0
at io.r2dbc.mssql.message.header.Type.valueOf(Type.java:68) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
at io.r2dbc.mssql.message.header.Header.decode(Header.java:215) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
at io.r2dbc.mssql.client.StreamDecoder$DecoderState.readChunk(StreamDecoder.java:289) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
at io.r2dbc.mssql.client.StreamDecoder.withState(StreamDecoder.java:112) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
... 53 common frames omitted
Table schema
Input Code
data class LegacyCustomer(
@Id @Column("Id") val id: Long? = null,
@Column("MobilePhone") val mobilePhone: String?,
@Column("FirstName") var firstName: String? = null,
@Column("LastName") var lastName: String? = null,
@Column("DMSId") var dmsId: String? = null,
@Column("BirthDate") var birthDate: LocalDate? = null,
@Column("Email") var email: String? = null,
@Column("Gender") var gender: Int? = null,
@Column("OccupationId") var occupationId: Long? = null,
@Column("SegmentId") var segmentId: Long? = null,
@Column("LFCustomerId") var customerId: String? = null,
@CreatedDate @Column("CreateDate") var createDate: Instant? = null,
@LastModifiedDate @Column("UpdateDate") var updateDate: Instant? = null
)
Steps to reproduce
Input Code
interface LegacyCustomerRepository : CoroutineCrudRepository<LegacyCustomer, Long> {
fun findAllByIdBetween(fromId: Long, toId: Long): Flow<LegacyCustomer>
}
@Service
class LegacyCustomerService(private val legacyCustomerRepository: LegacyCustomerRepository){
suspend fun createCustomerMigration(customerMigrationRequest: CustomerMigrationRequest) {
with(customerMigrationRequest) {
logger.info("Legacy customer migration started from id: $startFromId to id: $endToId.")
legacyCustomerRepository.findAllByIdBetween(startFromId, endToId)
.onCompletion {
if (it == null) logger.info("Legacy customer migration finished. from id: $startFromId to id: $endToId.", it)
}
.catch {
logger.error("Legacy customer migration finished with error. from id: $startFromId to id: $endToId.", it)
}
.onEach {
try {
logger.info("Migrating customer with id: ${it.id}")
// ... another suspend method call
} catch (exception: Throwable) {
logger.error("Migrating failed for customer with id: ${it.id}", exception)
}
}.launchIn(CoroutineScope(Dispatchers.IO))
}
}
}
Expected behavior/code
Flow should be collected without errors as that is while using Reactor Flux. I do not face with the error if i use Flux and subscribe it:
fun findAllByIdBetween(fromId: Long, toId: Long): Flow<LegacyCustomer>
legacyCustomerRepo.findAllByIdBetween(startFromId, endToId)
.doAfterTerminate {
logger.info("Legacy customer migration finished. from id: $startFromId to id: $endToId.")
}
.doOnError {
logger.error("Legacy customer migration finished with error. from id: $startFromId to id: $endToId.", it)
}
.subscribe {
try {
logger.info("Migrating customer with id: ${it.id}")
runBlocking {
// ... another suspend method call
}
} catch (exception: Throwable) {
logger.error("Migrating failed for customer with id: ${it.id}", exception)
}
}
Possible Solution
When i used cursored exchange with fetch size as 128 instead of direct exchange, it worked as expected. But had faced with following prepared statement cache error when i called second time:
Could not find prepared statement with handle 1073741825
Additional context
This looks like a protocol offset error. There are a lot of moving parts involved. Can you provide a minimal reproducer using R2DBC MSSQL code only (without the use of Spring)? Otherwise, it will be next to impossible to diagnose the issue.
Closing due to lack of requested feedback. If you would like us to look at this issue, please provide the requested information and we will re-open the issue.
Hi @mp911de, this error (Could not find prepared statement with handle 1073741825) now pops up again in version 1.0.1 when using prepared statements (bind method). First sql after startup runs fine, but subsequent sqls fail.
The error is not present in version 1.0.0, and the only code change I can see which has anything to do with prepared statements seems to be the removal of the no-param constructor in ConnectionOptions which was part of the commit which resolved issue 267.
We use bind a lot, so if it is possible to resolve this in the 1.x branch it would be much appreciated as we need to use Spring Boot 3 which is wired to the 1.x branch.