kotlinx.coroutines
kotlinx.coroutines copied to clipboard
kotlinx.coroutines.CoroutinesInternalError: Fatal exception in coroutines machinery for CancellableContinuation
Describe the bug
The following test works on Android, but crashes on iOS:
@Test
fun reproduce() = runBlocking {
val dispatcher = newSingleThreadContext("MyDispatcher")
launch(dispatcher) {
try {
withContext(Dispatchers.Default) {
println("!!! before delay")
delay(1000)
println("!!! after delay")
}
} catch (e: Exception) {
println("!!! Ignored exception: $e")
}
}
delay(200)
println("!!! before close")
dispatcher.close()
println("!!! after close")
println("!!! DONE !!!")
}
Output form Android Emulator Arm64 (no crashes, everything works as expected):
I !!! before delay
I !!! before close
I !!! after close
I !!! DONE !!!
I !!! after delay
I !!! Ignored exception: java.util.concurrent.CancellationException: The task was rejected
run finished: 1 tests, 0 failed, 0 ignored
Output from iOS Simulator Arm64 (test crashed):
1 test completed, 1 failed
FAILURE: Build failed with an exception.
* What went wrong:
Execution failed for task ':sync-android-kt:iosSimulatorArm64Test'.
> Test running process exited unexpectedly.
Current test: reproduce
Process output:
!!! before delay
!!! before close
!!! after close
!!! DONE !!!
!!! after delay
kotlinx.coroutines.CoroutinesInternalError: Fatal exception in coroutines machinery for CancellableContinuation(DispatchedContinuation[DarwinGlobalQueueDispatcher@53c0308, Continuation @ 6]){Completed}@79400a0. Please read KDoc to 'handleFatalException' method and report this incident to maintainers
at 0 test.kexe 0x102735883 kfun:kotlin.Error#<init>(kotlin.String?;kotlin.Throwable?){} + 143 (/opt/buildAgent/work/2fed3917837e7e79/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/Exceptions.kt:14:63)
at 1 test.kexe 0x10298ba27 kfun:kotlinx.coroutines.CoroutinesInternalError#<init>(kotlin.String;kotlin.Throwable){} + 123 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/Exceptions.common.kt:23:77)
at 2 test.kexe 0x1029f242f kfun:kotlinx.coroutines.DispatchedTask#handleFatalException(kotlin.Throwable?;kotlin.Throwable?){} + 755 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt:142:22)
at 3 test.kexe 0x1029f2113 kfun:kotlinx.coroutines.DispatchedTask#run(){} + 2455 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt:113:13)
at 4 test.kexe 0x102a1b9bb kfun:kotlinx.coroutines.Runnable#run(){}-trampoline + 91 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/native/src/Runnable.kt:10:19)
at 5 test.kexe 0x102a17067 kfun:kotlinx.coroutines.DarwinGlobalQueueDispatcher.dispatch$lambda$0#internal + 119 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/nativeDarwin/src/Dispatchers.kt:22:23)
at 6 test.kexe 0x102a170c3 kfun:kotlinx.coroutines.DarwinGlobalQueueDispatcher.$dispatch$lambda$0$FUNCTION_REFERENCE$0.invoke#internal + 71 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/nativeDarwin/src/Dispatchers.kt:21:13)
at 7 test.kexe 0x102a17193 kfun:kotlinx.coroutines.DarwinGlobalQueueDispatcher.$dispatch$lambda$0$FUNCTION_REFERENCE$0.$<bridge-UNN>invoke(){}#internal + 71 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/nativeDarwin/src/Dispatchers.kt:21:13)
at 8 test.kexe 0x102865807 kfun:kotlin.Function0#invoke(){}1:0-trampoline + 99 (/Users/teamcity/.gradle/daemon/8.2.1/[K][Suspend]Functions:1:1)
at 9 test.kexe 0x102a18d13 _6f72672e6a6574627261696e732e6b6f746c696e783a6b6f746c696e782d636f726f7574696e65732d636f72652f6f70742f6275696c644167656e742f776f726b2f343465633665383530643563363366302f6b6f746c696e782d636f726f7574696e65732d636f72652f6e617469766544617277696e2f7372632f44697370617463686572732e6b74_knbridge8 + 191 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/nativeDarwin/src/Dispatchers.kt:21:13)
at 10 libdispatch.dylib 0x18016b4f3 _dispatch_call_block_and_release + 23
at 11 libdispatch.dylib 0x18016cd3b _dispatch_client_callout + 15
at 12 libdispatch.dylib 0x18017f5d3 _dispatch_root_queue_drain + 1075
at 13 libdispatch.dylib 0x18017fd57 _dispatch_worker_thread2 + 231
at 14 libsystem_pthread.dylib 0x1045cf8e7 _pthread_wqthread + 223
at 15 libsystem_pthread.dylib 0x1045ce6e3 start_wqthread + 7
Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: [DispatchedCoroutine{Completed}@4661750, DarwinGlobalQueueDispatcher@53c0308]
at 0 test.kexe 0x10273c19b kfun:kotlin.Throwable#<init>(kotlin.String?){} + 119 (/opt/buildAgent/work/2fed3917837e7e79/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/Throwable.kt:28:37)
at 1 test.kexe 0x102735977 kfun:kotlin.Exception#<init>(kotlin.String?){} + 115 (/opt/buildAgent/work/2fed3917837e7e79/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/Exceptions.kt:23:44)
Uncaught Kotlin exception: at 2 test.kexe 0x102735b97 kfun:kotlin.RuntimeException#<init>(kotlin.String?){} + 115 (/opt/buildAgent/work/2fed3917837e7e79/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/Exceptions.kt:34:44)
at 3 test.kexe 0x102a15593 kfun:kotlinx.coroutines.internal.DiagnosticCoroutineContextException#<init>(kotlin.coroutines.CoroutineContext){} + 167 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/native/src/internal/CoroutineExceptionHandlerImpl.kt:27:5)
at 4 test.kexe 0x1029ee17f kfun:kotlinx.coroutines.internal#handleUncaughtCoroutineException(kotlin.coroutines.CoroutineContext;kotlin.Throwable){} + 647 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/internal/CoroutineExceptionHandlerImpl.common.kt:43:33)
at 5 test.kexe 0x1029843ab kfun:kotlinx.coroutines#handleCoroutineException(kotlin.coroutines.CoroutineContext;kotlin.Throwable){} + 515 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/CoroutineExceptionHandler.kt:28:5)
at 6 test.kexe 0x1029f247b kfun:kotlinx.coroutines.DispatchedTask#handleFatalException(kotlin.Throwable?;kotlin.Throwable?){} + 831 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt:144:9)
at 7 test.kexe 0x1029f2113 kfun:kotlinx.coroutines.DispatchedTask#run(){} + 2455 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt:113:13)
... and 12 more common stack frames skipped
Caused by: kotlin.IllegalStateException: Dispatcher MyDispatcher was closed, attempted to schedule: DispatchedContinuation[MultiWorkerDispatcher@4584120, Continuation @ 7]
at 0 test.kexe 0x10273c19b kfun:kotlin.Throwable#<init>(kotlin.String?){} + 119 (/opt/buildAgent/work/2fed3917837e7e79/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/Throwable.kt:28:37)
at 1 test.kexe 0x102735977 kfun:kotlin.Exception#<init>(kotlin.String?){} + 115 (/opt/buildAgent/work/2fed3917837e7e79/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/Exceptions.kt:23:44)
at 2 test.kexe 0x102735b97 kfun:kotlin.RuntimeException#<init>(kotlin.String?){} + 115 (/opt/buildAgent/work/2fed3917837e7e79/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/Exceptions.kt:34:44)
at 3 test.kexe 0x1027361bf kfun:kotlin.IllegalStateException#<init>(kotlin.String?){} + 115 (/opt/buildAgent/work/2fed3917837e7e79/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/Exceptions.kt:70:44)
at 4 test.kexe 0x102a1243f kfun:kotlinx.coroutines.MultiWorkerDispatcher.dispatch#internal + 819 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt:128:23)
at 5 test.kexe 0x102a1aad7 kfun:kotlinx.coroutines.CoroutineDispatcher#dispatch(kotlin.coroutines.CoroutineContext;kotlinx.coroutines.Runnable){}-trampoline + 67 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt:<unknown>)
at 6 test.kexe 0x1029f08f7 kfun:kotlinx.coroutines.internal#resumeCancellableWith__at__kotlin.coroutines.Continuation<0:0>(kotlin.Result<0:0>;kotlin.Function1<kotlin.Throwable,kotlin.Unit>?){0§<kotlin.Any?>} + 1019 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt:278:64)
at 7 test.kexe 0x1029f0f7b kfun:kotlinx.coroutines.internal#resumeCancellableWith$default__at__kotlin.coroutines.Continuation<0:0>(kotlin.Result<0:0>;kotlin.Function1<kotlin.Throwable,kotlin.Unit>?;kotlin.Int){0§<kotlin.Any?>} + 287 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt:274:8)
at 8 test.kexe 0x102978e77 kfun:kotlinx.coroutines.DispatchedCoroutine#afterResume(kotlin.Any?){} + 311 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/Builders.common.kt:257:29)
at 9 test.kexe 0x102a190ab kfun:kotlinx.coroutines.AbstractCoroutine#afterResume(kotlin.Any?){}-trampoline + 59 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt:<unknown>)
at 10 test.kexe 0x102976a8b kfun:kotlinx.coroutines.AbstractCoroutine#resumeWith(kotlin.Result<1:0>){} + 303 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt:99:9)
at 11 test.kexe 0x102868e03 kfun:kotlin.coroutines.Continuation#resumeWith(kotlin.Result<1:0>){}-trampoline + 99 (/opt/buildAgent/work/2fed3917837e7e79/kotlin/libraries/stdlib/src/kotlin/coroutines/Continuation.kt:26:12)
at 12 test.kexe 0x102740fd7 kfun:kotlin.coroutines.native.internal.BaseContinuationImpl#resumeWith(kotlin.Result<kotlin.Any?>){} + 1163 (/opt/buildAgent/work/2fed3917837e7e79/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/coroutines/ContinuationImpl.kt:43:32)
at 13 test.kexe 0x102868e03 kfun:kotlin.coroutines.Continuation#resumeWith(kotlin.Result<1:0>){}-trampoline + 99 (/opt/buildAgent/work/2fed3917837e7e79/kotlin/libraries/stdlib/src/kotlin/coroutines/Continuation.kt:26:12)
at 14 test.kexe 0x1029f1ed3 kfun:kotlinx.coroutines.DispatchedTask#run(){} + 1879 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt:104:71)
at 15 test.kexe 0x102a1b9bb kfun:kotlinx.coroutines.Runnable#run(){}-trampoline + 91 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/native/src/Runnable.kt:10:19)
... and 11 more common stack frames skipped
Child process terminated with signal 6: Abort trap
* Try:
> Run with --stacktrace option to get the stack trace.
> Run with --info or --debug option to get more log output.
> Run with --scan to get full insights.
* Get more help at https://help.gradle.org
BUILD FAILED in 1m 3s
Maybe the root cause is related to fact that on iOS IllegalStateException is thrown when flow returns from the withContext(Dispatchers.Default) section, in case of android the output above demonstrates that the CancellationException is thrown at the same place.
Versions
androidGradlePluginVersion = "8.1.2"
kotlinVersion = '1.9.23'
kotlinCoroutinesVersion = "1.8.0"
skieVersion = "0.6.2"
classpath "com.android.tools.build:gradle:$androidGradlePluginVersion"
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlinVersion"
implementation "org.jetbrains.kotlin:kotlin-stdlib:$kotlinVersion"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion"
plugins {
id 'org.jetbrains.kotlin.multiplatform'
id 'com.android.library'
id "co.touchlab.skie" version "$skieVersion"
}
Interestingly, for me, this test crashes on the JVM as well.
It also crashes for me on the JVM. On Android, it's either timing (so Ignored exception is printed) or slightly different executor/dispatcher setup.
The cause is straightforward -- there are active coroutines running on the executor that is concurrently closed.
So, any attempt to (re)dispatch any coroutine on that dispatcher fails. Moreover, often there is no place in the code (or this place might be unrelated) to rethrow such an exception, thus an error. It cannot be reasonably ignored -- it indicates something went really wrong (e.g. it might imply that your finally blocks in suspend functions didn't execute).
So I won't triage it as a bug. We can document it better and maybe (but it's really debatable topic) treat it not as an internal error, but instead raise global exception handler immediately (which, on Android, will crash the app)
So I won't triage it as a bug.
The error message clearly states that any such error is worth reporting to us:
kotlinx.coroutines.CoroutinesInternalError: Fatal exception in coroutines machinery for CancellableContinuation(DispatchedContinuation[DarwinGlobalQueueDispatcher@53c0308, Continuation @ 6]){Completed}@79400a0. Please read KDoc to 'handleFatalException' method and report this incident to maintainers
So, I don't think we can ignore it when people do just that.
Also, I think this actually is an issue on our side: since there is no guarantee that dispatch doesn't throw, the places where it throws shouldn't be treated as surprises (that is, internal errors).
True, we have to acknowledge that.
since there is no guarantee that dispatch doesn't throw
The documentation (somewhat vaguely, though) states the following:
This method should generally be exception-safe. An exception thrown from this method may leave the coroutines that use this dispatcher in an inconsistent and hard-to-debug state.
We can re-visit the corresponding places again, though there are not that many alternatives -- internal error or immediate crash (handleCoroutineException, which is also invoked by the internal error path).
We have another place like this -- ThreadContextElement (the documentation to handleFatalException mentions it)
It cannot be reasonably ignored -- it indicates something went really wrong (e.g. it might imply that your finally blocks in suspend functions didn't execute).
So what's correct way to close the dispatcher?
My real use case is simple: I'm implementing a client which setup websocket connection to backend and creates singleThreadDispatcher.
When an event is received from the websocket - the client parses it on singleThreadDispatcher then updates database withContext(Dispatchers.IO) and emitting an onUpdated event to outside.
Also the client has shutdown() method where it closes the websocket connection and the singleThreadDispatcher.
Obviously shutdown could be called at any moment of time.
So my code looks like:
class Client {
private val dispatcher = newSingleThreadContext("MyDispatcher")
private val scope = CoroutineScope(dispatcher)
private var websocket: WebSocket? = null
val onUpdated = MutableSharedFlow<Event>()
private fun onWebsocketMessage(message: String) {
scope.launch {
val event = parseMessage(message)
withContext(Dispatchers.IO) {
updateDB(event)
}
processEvent(event)
onUpdated.emit(event)
}
}
fun shutdown() {
websocket?.close()
scope.cancel()
dispatcher.close()
}
}
Is there a better approach than just maintain collection of ioDispatcherJobs and joinAll them in the shutdown() method?
Like:
class Client {
private val dispatcher = newSingleThreadContext("MyDispatcher")
private val scope = CoroutineScope(dispatcher + SupervisorJob())
private var websocket: WebSocket? = null
val onUpdated = MutableSharedFlow<Event>()
private val ioDispatcherJobs = mutableListOf<Job>()
private fun onWebsocketMessage(message: String) {
scope.launch {
val event = parseMessage(message)
val job = launch(Dispatchers.IO, CoroutineStart.LAZY) {
updateDB(event)
}
ioDispatcherJobs += job
job.join()
ioDispatcherJobs -= job
processEvent(event)
onUpdated.emit(event)
}
}
suspend fun shutdown() {
websocket?.close()
withContext(dispatcher) {
ioDispatcherJobs.joinAll()
}
scope.cancel()
dispatcher.close()
}
}
May I suggest not using newSingleThreadContext at all, instead doing Dispatchers.IO.limitedParallelism(1)? This way, you won't need to close the dispatcher at all.
Correct me if I'm wrong, but I don't see in documentation thatlimitedParallelism(1) guarantees execution on single thread.
It means that I have to synchronize access to data everywhere, for example in my processEvent method:
val allEvents = mutableListOf<Event>()
val mutex = Mutex()
fun processEvent(event: Event) {
mutex.withLock {
allEvents += event
}
}
Correct?
Which is exactly what I'm trying to avoid by using newSingleThreadContext
I don't see in documentation that
limitedParallelism(1)guarantees execution on single thread.
It doesn't guarantee that this will always run on the same thread, so if you have things like thread local variables, yes, limitedParallelism on its own won't help you; but it does guarantee that the parallelism will be at most 1, or, in other words, at most one thread at a time will execute the code scheduled on that dispatcher. So no, you don't need mutexes: only one thread at a time (though possibly a different one between calls) can call processEvent.
Thank you, that make sense. Two more questions regarding limitedParallelism(1) then:
- Does it guarantees FIFO order of operations?
- As it could be executed on different threads I still have to to use atomic vars, otherwise it's not guaranteed that I read last value set to a var by a different thread. Correct?
var eventCounter by atomic(0)
fun processEvent(event: Event) {
eventCounter++
}
- Yes, it stores a queue internally.
- No, the happens-before relationship is guaranteed by the coroutines machinery.
Thank you so much! I'll try to go with limitedParallelism(1)
Regarding initial issue - I would expect the same behaviour on all platforms. So one of possible solution is to make the test crash on Android the same way how it crashes on iOS and JVM.
In this case common code debugged once on android - will work on other platforms without changes.
@rusmonster, could you please explain why you thought that limitedParellelism was unsuitable? @qwwdfsad found this misleading piece of information: https://github.com/KStateMachine/kstatemachine/blob/master/docs/index.md#use-single-threaded-coroutinescope Are there any other ones?