rsocket-kotlin
rsocket-kotlin copied to clipboard
JobCancellationException after client closes connection
When using ktor with a requestStream handler like this
fun Application.module() {
install(WebSockets)
install(RSocketSupport) {
server = RSocketServer()
}
routing {
rSocket("stream") {
RSocketRequestHandler {
log.info("New subscriber")
requestStream {
flow {
repeat(2) { i ->
log.info("Emit $i")
emit(buildPayload { data("data: $i") })
}
}
}
}
}
}
}
I get the following exception after the client closes the connection:
kotlinx.coroutines.JobCancellationException: StandaloneCoroutine is cancelling
Caused by: kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed
at kotlinx.coroutines.channels.Closed.getReceiveException(AbstractChannel.kt:1141)
at kotlinx.coroutines.channels.AbstractChannel$ReceiveElement.resumeReceiveClosed(AbstractChannel.kt:938)
at kotlinx.coroutines.channels.AbstractSendChannel.helpClose(AbstractChannel.kt:343)
at kotlinx.coroutines.channels.AbstractSendChannel.close(AbstractChannel.kt:272)
at kotlinx.coroutines.channels.SendChannel$DefaultImpls.close$default(Channel.kt:94)
at io.ktor.http.cio.websocket.DefaultWebSocketSessionImpl$runIncomingProcessor$1.invokeSuspend(DefaultWebSocketSessionImpl.kt:158)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:235)
at kotlinx.coroutines.DispatchedTaskKt.resumeUnconfined(DispatchedTask.kt:191)
at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:162)
at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:406)
at kotlinx.coroutines.CancellableContinuationImpl.completeResume(CancellableContinuationImpl.kt:522)
at kotlinx.coroutines.channels.AbstractChannel$ReceiveHasNext.resumeReceiveClosed(AbstractChannel.kt:989)
at kotlinx.coroutines.channels.AbstractSendChannel.helpClose(AbstractChannel.kt:343)
at kotlinx.coroutines.channels.AbstractSendChannel.close(AbstractChannel.kt:272)
at kotlinx.coroutines.channels.SendChannel$DefaultImpls.close$default(Channel.kt:94)
at io.ktor.http.cio.websocket.RawWebSocket$1.invokeSuspend(RawWebSocket.kt:66)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.ktor.server.netty.EventLoopGroupProxy$Companion.create$lambda-1$lambda-0(NettyApplicationEngine.kt:251)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
2022-02-08 13:16:07.904 [eventLoopGroupProxy-4-1] ERROR Application - Unhandled exception caught for CoroutineName(call-handler)
kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed
at kotlinx.coroutines.channels.Closed.getReceiveException(AbstractChannel.kt:1141)
at kotlinx.coroutines.channels.AbstractChannel$ReceiveElement.resumeReceiveClosed(AbstractChannel.kt:938)
at kotlinx.coroutines.channels.AbstractSendChannel.helpClose(AbstractChannel.kt:343)
at kotlinx.coroutines.channels.AbstractSendChannel.close(AbstractChannel.kt:272)
at kotlinx.coroutines.channels.SendChannel$DefaultImpls.close$default(Channel.kt:94)
at io.ktor.http.cio.websocket.DefaultWebSocketSessionImpl$runIncomingProcessor$1.invokeSuspend(DefaultWebSocketSessionImpl.kt:158)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:235)
at kotlinx.coroutines.DispatchedTaskKt.resumeUnconfined(DispatchedTask.kt:191)
at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:162)
at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:406)
at kotlinx.coroutines.CancellableContinuationImpl.completeResume(CancellableContinuationImpl.kt:522)
at kotlinx.coroutines.channels.AbstractChannel$ReceiveHasNext.resumeReceiveClosed(AbstractChannel.kt:989)
at kotlinx.coroutines.channels.AbstractSendChannel.helpClose(AbstractChannel.kt:343)
at kotlinx.coroutines.channels.AbstractSendChannel.close(AbstractChannel.kt:272)
at kotlinx.coroutines.channels.SendChannel$DefaultImpls.close$default(Channel.kt:94)
at io.ktor.http.cio.websocket.RawWebSocket$1.invokeSuspend(RawWebSocket.kt:66)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.ktor.server.netty.EventLoopGroupProxy$Companion.create$lambda-1$lambda-0(NettyApplicationEngine.kt:251)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
I'm executing the following code on the client side:
runBlocking {
val client = HttpClient(OkHttp) {
install(WebSockets)
install(RSocketSupport)
}
val stream = client
.rSocket("ws://localhost:8080/stream")
.requestStream(Payload.Empty)
stream
.take(2)
.toList()
client.close()
}
The full example project to reproduce the error: https://github.com/Alexander-N/ktor-rsocket-error
Hey, if I understand correctly, you get an error on server side, right? And from stacktrace I see, that the error thrown from ktor-websockets implementation, and not from rsocket-kotlin. I see, that may be we can try at our (rsocket-kotlin) side to close websocket safely (via close frame) inside implementation, but looks like it's an issue in exception reporting on ktor side.
I will do some tests later to check if ktor do the same, if just using plain websockets without rsocket-kotlin over it.
Thanks for looking into this! Yes, the error is on server side. If the websocket is not closed then ktor-websockets seems correct in throwing this exception, so I'm not sure if it's worth reporting this issue upstream.
The main question is, what happens on ktor side, when closing HttpClient. Will it even try to close open websocket session safely (via close frame) or it will just close underlying connection. And as so, what is expected behavior (by design) is in ktor. I will take a look later and will give an answer here, and if needed will do a fix, or will create an issue in ktor repository.
That would be great, are you still planning to look into it @olme04?
Yes, I will look into it in coming days
will be fixed on ktor side here: https://youtrack.jetbrains.com/issue/KTOR-5016/Channel-was-closed-exception-in-WebSocket-during-normal-use