kotlinx.coroutines
kotlinx.coroutines copied to clipboard
Timeout not detected on newSingleThreadContext() when withTimeout wraps a blocking job
If withTimeout wraps a blocking job, it does not detect a timeout in conjunction with newSingleThreadContext(). Timeout detection works as expected with other dispatcher flavors:
import kotlinx.coroutines.* // ktlint-disable no-wildcard-imports
import java.util.concurrent.Executors
@OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class)
fun main(): Unit = runBlocking {
listOf(
Triple("newSingleThreadExecutor", { Executors.newSingleThreadExecutor().asCoroutineDispatcher() }, true),
Triple("newSingleThreadContext", { newSingleThreadContext("single") }, true),
Triple("newFixedThreadPoolContext(2)", { newFixedThreadPoolContext(2, "double") }, true),
Triple("IO.limitedParallelism(1)", { Dispatchers.IO.limitedParallelism(1) }, false),
Triple("IO.limitedParallelism(2)", { Dispatchers.IO.limitedParallelism(2) }, false)
).forEach { (name, dispatcher, needsClosing) ->
print("$name: ")
val dispatcher = dispatcher()
try {
withContext(dispatcher) {
try {
withTimeout(1000) {
launch {
Thread.sleep(2000)
// delay(2000)
}
}
println("no timeout detected")
} catch (t: Throwable) {
println("$t")
}
}
} finally {
if (needsClosing) (dispatcher as ExecutorCoroutineDispatcher).close()
}
}
}
produces:
newSingleThreadExecutor: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1000 ms
newSingleThreadContext: no timeout detected
newFixedThreadPoolContext(2): kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1000 ms
IO.limitedParallelism(1): kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1000 ms
IO.limitedParallelism(2): kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1000 ms
Version: kotlinx-coroutines-core:1.7.3
Effects observed here first: https://github.com/kotest/kotest/issues/3672
What are you trying to achieve? withTimeout does not interrupt blocking code as is, you need runInterruptible (https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-interruptible.html) for that. The only difference is whether, on a timeout, an exception will be thrown after the code finishes executing.
That's of course all valid and has been noted in https://github.com/kotest/kotest/issues/3672#issuecomment-1703885285 and https://github.com/kotest/kotest/issues/3672#issuecomment-1704310884.
What the original test in question was trying to achieve can probably be best answered by @sschuberth.
The point here is that the timeout exception after the blocking section is usually available, but missing with one type of dispatcher. From a Kotest point of view, this change in behavior is considered a regression.
What the original test in question was trying to achieve can probably be best answered by @sschuberth.
The code where this Kotest regression occurred is here, the test was written by @oheger-bosch. My interpretation is that the test should ensure that a call to fossId.scanPackage() blocks, and this is being tested by calling it withTimeout, expecting that we run into a TimeoutCancellationException.
Came up with another scenario: What happens if the timeout just occurs during a cpu-bound period, even if the coroutine is non-blocking otherwise? Seems like in this case newSingleThreadContext() also misses the timeout:
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExecutorCoroutineDispatcher
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.newFixedThreadPoolContext
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeout
import kotlinx.datetime.Clock
import java.util.concurrent.Executors
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
@OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class)
fun main(): Unit = runBlocking {
mapOf<String, CoroutineScope.() -> Unit>(
"launched Thread.sleep" to {
launch {
Thread.sleep(100 /* ms */)
}
},
"launched delay + CPU hog" to {
launch {
delay(20.milliseconds)
hogCpu(100.milliseconds)
}
}
).forEach { (blockingVariantName, blockingBlock) ->
println("\n--- $blockingVariantName ---")
listOf(
Triple("newSingleThreadExecutor", { Executors.newSingleThreadExecutor().asCoroutineDispatcher() }, true),
Triple("newSingleThreadContext", { newSingleThreadContext("single") }, true),
Triple("newFixedThreadPoolContext(2)", { newFixedThreadPoolContext(2, "double") }, true),
Triple("IO.limitedParallelism(1)", { Dispatchers.IO.limitedParallelism(1) }, false),
Triple("IO.limitedParallelism(2)", { Dispatchers.IO.limitedParallelism(2) }, false)
).forEach { (name, dispatcher, needsClosing) ->
print("$name: ")
val dispatcher = dispatcher()
val startInstant = Clock.System.now()
try {
withContext(dispatcher) {
try {
withTimeout(50.milliseconds) {
blockingBlock()
}
println("no timeout detected")
} catch (t: Throwable) {
println("$t")
}
}
} finally {
if (needsClosing) (dispatcher as ExecutorCoroutineDispatcher).close()
}
println("– elapsed time: ${Clock.System.now() - startInstant}")
}
}
}
fun hogCpu(duration: Duration) {
var spinCount = 0
val startInstant = Clock.System.now()
while (Clock.System.now() - startInstant < duration) {
spinCount++
}
// println("- spinCount=$spinCount")
}
produces
--- launched Thread.sleep ---
newSingleThreadExecutor: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 50 ms
– elapsed time: 120.536ms
newSingleThreadContext: no timeout detected
– elapsed time: 105.131ms
newFixedThreadPoolContext(2): kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 50 ms
– elapsed time: 102.367ms
IO.limitedParallelism(1): kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 50 ms
– elapsed time: 108.744ms
IO.limitedParallelism(2): kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 50 ms
– elapsed time: 102.126ms
--- launched delay + CPU hog ---
newSingleThreadExecutor: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 50 ms
– elapsed time: 133.593ms
newSingleThreadContext: no timeout detected
– elapsed time: 121.507ms
newFixedThreadPoolContext(2): kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 50 ms
– elapsed time: 121.256ms
IO.limitedParallelism(1): kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 50 ms
– elapsed time: 121.249ms
IO.limitedParallelism(2): kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 50 ms
– elapsed time: 121.699ms
My interpretation is that the test should ensure that a call to fossId.scanPackage() blocks, and this is being tested by calling it withTimeout, expecting that we run into a TimeoutCancellationException.
Suggestion:
val time = measureTime {
// block taking a long time
}
assertTrue(time > 2.seconds)
This way, there's no illusion that the blocking code gets canceled.
What happens if the timeout just occurs during a cpu-bound period
In this case, you're supposed to call ensureActive periodically during your work. Otherwise, there's no way for withTimeout to cancel the block of code.
In this case, you're supposed to call ensureActive periodically during your work. Otherwise, there's no way for withTimeout to cancel the block of code.
(In my own code, 'm actually using isActive and yield() in CPU-bound blocks.)
Regarding this case, as a library user, without looking into the actual kotlinx.coroutines code, my mental model is as follows:
withTimeoutwraps a block likecoroutineScopedoes.- At the end, it waits like
joinAll()for coroutines launched in that block. - I'd expect that (at least conceptual)
joinAll()to be the place where the cancellation would happen (and the timeout be detected for blocking jobs).
If that is conceptually correct: If a job, which is waited for, happens to be consuming the CPU for a very brief period at the end, and in exactly that period the timeout expires, doesn't that seem racy?
Note that I'm not expecting a cancellation to interrupt that (briefly) blocking job. Just to detect the timeout.
Suggestion:
Thanks, but how you avoid the test to really take "a long time" then? The test case should complete as quickly as possible.
Thanks, but how you avoid the test to really take "a long time" then? The test case should complete as quickly as possible.
In this case you should use runInterruptible or the corresponding Kotest blocking/timeout configuration.
or the corresponding Kotest blocking/timeout configuration.
I just looked at the implementation for Kotest's shouldTimeout, and it turns out it does the same thing that we did manually, see
https://github.com/kotest/kotest/blob/402597ca4bf581f10df2f3d062a2427e0de2d005/kotest-assertions/kotest-assertions-shared/src/jvmMain/kotlin/io/kotest/assertions/async/timeout.kt#L18-L26
So I guess this needs some fixing as well?
If it's for running suspending code, it seems perfectly fine. The problem is only about code that doesn't participate in cooperative cancellation.
Thanks guys, I've fixed this in our code.
Looking at it, here's what I think happens.
- If you remove
launchand just havewithTimeout(n) { Thread.sleep(2 * n) }, all dispatchers will report that no timeout happened. The reason is that when the coroutine finishes, we think: "Whew, even though the timeout was reached, at least the task finished successfully!" and don't raise an exception. What happens here is different: inwithTimeout(n) { launch { ... } }, the body finishes immediately after launching a child, butwithTimeoutcan't complete until all children do. The difference is how the completion of children is treated. newSingleThreadExecutoruses aScheduledExecutorServiceinternally.- When a
ScheduledExecutorServiceis used as a coroutine dispatcher, it's also treated as a source of time (Delay). A source of time is itself responsible for handling timeouts and delays regarding the tasks running in it. Things that are not sources of time delegate this to the global source of time. - When the designated second of waiting is up, for all dispatchers that are not themselves sources of time, the global source of time cancels the coroutine. After the dispatcher is done waiting for
Thread.sleep, it notices that cancellation happened and raises an exception. - On the other hand,
newSingleThreadExecutoralso uses its thread to process timeouts. But its thread is busy withThread.sleepat the point when it should cancel the coroutine. AfterThread.sleep, this thread happily proceeds to finish the execution of the coroutine: after all, there is no work left to be done. It then forgoes canceling the coroutine: "No matter, it's finished already."
tl;dr: when we schedule something on a Delay internally, like cancellation due to a timeout, on single-threaded Delay dispatchers, there's a risk of these scheduled blocks executing much later than they are supposed to. I don't understand any harm in this so far (the reproducer for this issue is heavily unidiomatic code), but cc @qwwdfsad, I think we have to keep this in mind and either not rely on accurate timings or rethink the guarantees of Delay implementations.
@dkhalanskyjb Thanks so much for looking into it thoroughly and explaining all the details involved. I agree with the reproducer being pretty peculiar. Maybe this corner case could just be documented in a way that is easy for library users to understand (without exposing dispatcher/executor mechanisms).
Would something like this do the trick?
withTimeoutdoes not guarantee aTimeoutCancellationExceptionif the timeout occurs during a blocking section of code, which is not followed by suspending code. You can useyield()after such blocking code to have a guaranteedTimeoutCancellationException.
Thanks for the detailed explanation. For me, it looks like a very subtle problem without delay design -- depending on the application's machinery, single-threaded dispatchers (multi-threaded dispatchers as well, but to a lesser extent) might drastically affect cancellations and interruptions policy, case in point.
To make the problem even worse, we cannot unconditionally use our global Delay source -- on Android/Swing/Fx it makes total sense to use the main thread time source (which is testable, aligned with their framerate etc.)