rsocket-kotlin icon indicating copy to clipboard operation
rsocket-kotlin copied to clipboard

JobCancellationException after client closes connection

Open Alexander-N opened this issue 3 years ago • 6 comments

When using ktor with a requestStream handler like this

fun Application.module() {
    install(WebSockets)
    install(RSocketSupport) {
        server = RSocketServer()
    }

    routing {
        rSocket("stream") {
            RSocketRequestHandler {
                log.info("New subscriber")
                requestStream {
                    flow {
                        repeat(2) { i ->
                            log.info("Emit $i")
                            emit(buildPayload { data("data: $i") })
                        }
                    }
                }
            }
        }
    }
}

I get the following exception after the client closes the connection:

kotlinx.coroutines.JobCancellationException: StandaloneCoroutine is cancelling
Caused by: kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed
        at kotlinx.coroutines.channels.Closed.getReceiveException(AbstractChannel.kt:1141)
        at kotlinx.coroutines.channels.AbstractChannel$ReceiveElement.resumeReceiveClosed(AbstractChannel.kt:938)
        at kotlinx.coroutines.channels.AbstractSendChannel.helpClose(AbstractChannel.kt:343)
        at kotlinx.coroutines.channels.AbstractSendChannel.close(AbstractChannel.kt:272)
        at kotlinx.coroutines.channels.SendChannel$DefaultImpls.close$default(Channel.kt:94)
        at io.ktor.http.cio.websocket.DefaultWebSocketSessionImpl$runIncomingProcessor$1.invokeSuspend(DefaultWebSocketSessionImpl.kt:158)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:235)
        at kotlinx.coroutines.DispatchedTaskKt.resumeUnconfined(DispatchedTask.kt:191)
        at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:162)
        at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:406)
        at kotlinx.coroutines.CancellableContinuationImpl.completeResume(CancellableContinuationImpl.kt:522)
        at kotlinx.coroutines.channels.AbstractChannel$ReceiveHasNext.resumeReceiveClosed(AbstractChannel.kt:989)
        at kotlinx.coroutines.channels.AbstractSendChannel.helpClose(AbstractChannel.kt:343)
        at kotlinx.coroutines.channels.AbstractSendChannel.close(AbstractChannel.kt:272)
        at kotlinx.coroutines.channels.SendChannel$DefaultImpls.close$default(Channel.kt:94)
        at io.ktor.http.cio.websocket.RawWebSocket$1.invokeSuspend(RawWebSocket.kt:66)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.ktor.server.netty.EventLoopGroupProxy$Companion.create$lambda-1$lambda-0(NettyApplicationEngine.kt:251)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:829)
2022-02-08 13:16:07.904 [eventLoopGroupProxy-4-1] ERROR Application - Unhandled exception caught for CoroutineName(call-handler)
kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed
        at kotlinx.coroutines.channels.Closed.getReceiveException(AbstractChannel.kt:1141)
        at kotlinx.coroutines.channels.AbstractChannel$ReceiveElement.resumeReceiveClosed(AbstractChannel.kt:938)
        at kotlinx.coroutines.channels.AbstractSendChannel.helpClose(AbstractChannel.kt:343)
        at kotlinx.coroutines.channels.AbstractSendChannel.close(AbstractChannel.kt:272)
        at kotlinx.coroutines.channels.SendChannel$DefaultImpls.close$default(Channel.kt:94)
        at io.ktor.http.cio.websocket.DefaultWebSocketSessionImpl$runIncomingProcessor$1.invokeSuspend(DefaultWebSocketSessionImpl.kt:158)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:235)
        at kotlinx.coroutines.DispatchedTaskKt.resumeUnconfined(DispatchedTask.kt:191)
        at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:162)
        at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:406)
        at kotlinx.coroutines.CancellableContinuationImpl.completeResume(CancellableContinuationImpl.kt:522)
        at kotlinx.coroutines.channels.AbstractChannel$ReceiveHasNext.resumeReceiveClosed(AbstractChannel.kt:989)
        at kotlinx.coroutines.channels.AbstractSendChannel.helpClose(AbstractChannel.kt:343)
        at kotlinx.coroutines.channels.AbstractSendChannel.close(AbstractChannel.kt:272)
        at kotlinx.coroutines.channels.SendChannel$DefaultImpls.close$default(Channel.kt:94)
        at io.ktor.http.cio.websocket.RawWebSocket$1.invokeSuspend(RawWebSocket.kt:66)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.ktor.server.netty.EventLoopGroupProxy$Companion.create$lambda-1$lambda-0(NettyApplicationEngine.kt:251)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:829)

I'm executing the following code on the client side:

runBlocking {
    val client = HttpClient(OkHttp) {
        install(WebSockets)
        install(RSocketSupport)
    }
    val stream = client
        .rSocket("ws://localhost:8080/stream")
        .requestStream(Payload.Empty)
    stream
        .take(2)
        .toList()
    client.close()
}

The full example project to reproduce the error: https://github.com/Alexander-N/ktor-rsocket-error

Alexander-N avatar Feb 08 '22 13:02 Alexander-N

Hey, if I understand correctly, you get an error on server side, right? And from stacktrace I see, that the error thrown from ktor-websockets implementation, and not from rsocket-kotlin. I see, that may be we can try at our (rsocket-kotlin) side to close websocket safely (via close frame) inside implementation, but looks like it's an issue in exception reporting on ktor side. I will do some tests later to check if ktor do the same, if just using plain websockets without rsocket-kotlin over it.

olme04 avatar Feb 10 '22 09:02 olme04

Thanks for looking into this! Yes, the error is on server side. If the websocket is not closed then ktor-websockets seems correct in throwing this exception, so I'm not sure if it's worth reporting this issue upstream.

Alexander-N avatar Feb 10 '22 10:02 Alexander-N

The main question is, what happens on ktor side, when closing HttpClient. Will it even try to close open websocket session safely (via close frame) or it will just close underlying connection. And as so, what is expected behavior (by design) is in ktor. I will take a look later and will give an answer here, and if needed will do a fix, or will create an issue in ktor repository.

olme04 avatar Feb 10 '22 13:02 olme04

That would be great, are you still planning to look into it @olme04?

Alexander-N avatar Feb 22 '22 07:02 Alexander-N

Yes, I will look into it in coming days

olme04 avatar Feb 22 '22 12:02 olme04

will be fixed on ktor side here: https://youtrack.jetbrains.com/issue/KTOR-5016/Channel-was-closed-exception-in-WebSocket-during-normal-use

whyoleg avatar Oct 18 '22 18:10 whyoleg