spring-framework icon indicating copy to clipboard operation
spring-framework copied to clipboard

WebClient logs "IllegalStateException: Only one connection receive subscriber allowed" when multiple requests made within Kotlin coroutine and response status is an error

Open gtg465x opened this issue 3 years ago • 1 comments
trafficstars

Kotlin version: 1.5.30 Spring Boot version: 2.5.4 and 2.6.1 Spring Framework version: 5.3.9

When making multiple WebClient requests within a Kotlin coroutine, if one of them fails with a 4xx or 5xx status, there are two errors logged... an expected WebClientResponseException and an unexpected IllegalStateException: Only one connection receive subscriber allowed from Reactor. A unit test to reproduce the behavior along with the full stack trace are posted below.

There may be other ways to reproduce this error, but in the case below, it only happens if the request is expecting a body and the 4xx response does not contain one. Also, if you comment out the /path1 request, the /path2 request fails normally without the additional Reactor exception, even if the 4xx response doesn't contain a body.

Unit test (spring-cloud-contract-wiremock is used to mock the server):

@SpringBootTest
@AutoConfigureWireMock(port = 8089)
class WebClientTest {

    private lateinit var webClient: WebClient

    @BeforeEach
    fun setup() {
        webClient = WebClient.builder().baseUrl("http://localhost:8089").build()

        stubFor(
            get(urlPathEqualTo("/path1")).willReturn(
                aResponse().withStatus(200)
                    .withHeader("Content-Type", "application/json")
                    .withBody("""{"data":""}""")
            )
        )
        stubFor(
            get(urlPathEqualTo("/path2")).willReturn(
                aResponse().withStatus(400)
            )
        )
    }

    @Test
    fun test(): Unit = runBlocking {
        val result1 = async(Dispatchers.IO) {
            webClient.get().uri("/path1").retrieve().awaitBody<Response>()
        }
        val result2 = async(Dispatchers.IO) {
            webClient.get().uri("/path2").retrieve().awaitBody<Response>()
        }
        Pair(result1.await(), result2.await())
    }
}

data class Response(val data: String)

Stack trace:

2021-12-09 10:30:32.618 ERROR 55926 --- [ctor-http-nio-4] reactor.core.publisher.Operators         : Operator called default onErrorDropped

java.lang.IllegalStateException: Only one connection receive subscriber allowed.
	at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:182)
	at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:143)
	at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
	at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:339)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4338)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:393)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onError(FluxMapFuseable.java:334)
	at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onError(FluxFilterFuseable.java:382)
	at reactor.core.publisher.MonoCollect$CollectSubscriber.onError(MonoCollect.java:144)
	at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:132)
	at reactor.core.publisher.Operators.error(Operators.java:198)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:165)
	at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)
	at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:167)
	at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:143)
	at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
	at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:339)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:150)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:189)
	at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)
	at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:174)
	at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:160)
	at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:414)
	at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:654)
	at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onStateChange(DefaultPooledConnectionProvider.java:201)
	at reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onStateChange(DefaultPooledConnectionProvider.java:457)
	at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:628)
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)


org.springframework.web.reactive.function.client.WebClientResponseException$BadRequest: 400 Bad Request from GET http://localhost:8089/path2

	at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:196)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	|_ checkpoint ⇢ 400 from GET http://localhost:8089/path2 [DefaultWebClient]
Stack trace:
		at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:196)
		at org.springframework.web.reactive.function.client.DefaultClientResponse.lambda$createException$1(DefaultClientResponse.java:213)
		at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
		at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
		at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onComplete(FluxDefaultIfEmpty.java:109)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:150)
		at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onComplete(FluxMapFuseable.java:344)
		at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onComplete(FluxFilterFuseable.java:391)
		at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1817)
		at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
		at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
		at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
		at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
		at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:400)
		at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:419)
		at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:473)
		at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:684)
		at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
		at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
		at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
		at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
		at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
		at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
		at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
		at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
		at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
		at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
		at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
		at java.lang.Thread.run(Thread.java:748)

gtg465x avatar Dec 09 '21 15:12 gtg465x

@spring-projects-issues

Garnerye avatar Oct 28 '22 02:10 Garnerye

looks similar to https://github.com/spring-projects/spring-framework/issues/26699

nkonev avatar Sep 08 '23 10:09 nkonev

Indeed, thanks for spotting this.

sdeleuze avatar Sep 08 '23 11:09 sdeleuze