kotlinx.coroutines
kotlinx.coroutines copied to clipboard
TimeoutCancellationException is thrown inconsistently between flatMapConcat and flatMapLatest
In the code below, the TimeoutCancellationException thrown by withTimeout
is propagated as expected when using flatMapConcat
, but not propagated when using flatMapLatest
.
If withTimeout
is replaced with check(false)
, the exception is propagated as expected in both.
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
val flow = flowOf("A", "B")
runBlocking {
var threw1 = false
try {
flow
.flatMapConcat {
callbackFlow<String> {
withTimeout(100) { delay(1000) }
}
}
.collect()
} catch (t: Throwable) {
threw1 = true
println("Error1 $t")
}
var threw2 = false
try {
flow
.flatMapLatest {
callbackFlow<String> {
withTimeout(100) { delay(1000) }
}
}
.collect()
} catch (t: Throwable) {
threw2 = true
println("Error2 $t")
}
if (!threw1) println("flatMapConcat did NOT throw")
if (!threw2) println("flatMapLatest did NOT throw")
}
Produces:
Error1 kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 100 ms
flatMapLatest did NOT throw
Digging deeper and I'm not sure whether this is an issue or not - or the right issue perhaps. I guess it depends on what TimeoutCancellationException is meant to be treated as - is it meant inherit the semantics of CancellationException which it extends - i.e. "It indicates normal cancellation of a coroutine." or does it override that to be used for "unexpected" cancellation of a coroutine?
I assume the former else it probably wouldn't be extending, but it feels most use cases of withTimeout that I can imagine would be for unexpected (or undesired) cancellation. If it's not meant to be used this way then we can replace our uses of withTimeout for one which throws something else.
I noticed flatMapLatest
intenrally uses ChannelFlowOperator
which uses coroutines to collect
and trandform
. If you try that appraoch in flatMapConcat
then shows the same results as in flatMapLatest
.
class CancelationException2 : CancellationException()
val flow = flowOf("A", "B")
runBlocking {
var threw1 = false
try {
flow
.flatMapConcat {
flow<String> {
// throw CancellationException2() // prints Error1 CancellationException2
// val job = launch {
// throw CancellationException2() // does not prints Error1 CancellationException2
// }
// job.join()
}
}
.collect()
} catch (t: Throwable) {
threw1 = true
println("Error1 $t")
}
It does say in CancellationException
:
Thrown by cancellable suspending functions if the coroutine is cancelled while it is suspended. It indicates normal cancellation of a coroutine.
This is not an inconsistency, but rather a semantic difference: see https://github.com/Kotlin/kotlinx.coroutines/pull/2964 with an explanation and https://github.com/Kotlin/kotlinx.coroutines/issues/2942 with actual rationale.
Documentation could've been better here though.
I guess it depends on what TimeoutCancellationException is meant to be treated as - is it meant inherit the semantics of CancellationException which it extends - i.e. "It indicates normal cancellation of a coroutine." or does it override that to be used for "unexpected" cancellation of a coroutine?
It is the former, but it should be the latter: #1374. Right now it's just our design mistake that is really tough to fix
I appreciate the clarification, thanks very much