kotlinx.coroutines
kotlinx.coroutines copied to clipboard
`runBlocking` should let go of CPU token before parking the thread
What do we have now?
runBlocking
parks the thread holding the CPU-token if it happens on a thread of Dispatchers.Default
.
What should be instead?
runBlocking
should let go of the CPU-token before parking, and "re-acquire" the token before un-parking (it should be un-parked in state where the token is already held by it).
Why?
The current solution is just "don't use runBlocking
" followed by "at least don't use runBlocking
inside Dispatchers.Default
", which is not a solution. This does not work in real-life scenarios, especially with large mixed codebases like IJ. In IJ we've tried to leverage #3439 but the approach is stillborn because it causes #3982 but on multi-threaded scale, and we even didn't start to tackle the thread locals which leak from outer thread into inner tasks.
In other similar scenarios (FJ's managed block), another thread is spawned to compensate for the blocked one. On JVM this is the most viable approach to this day. It's better to spawn an extra thread, which might do some other work later on or just die after timeout, and to risk OOME, than to have a starvation deadlock.
This does not work in real-life scenarios, especially with large mixed codebases like IJ.
runBlocking
has "blocking" in its name, so the idea is that it should be treated like any other blocking task. Do you also have an issue with the fact that a Dispatchers.Default
thread parks when connecting to the Internet or reading a file? If yes, why not use Dispatchers.IO
instead of Dispatchers.Default
everywhere? If not, what's the difference in your mind between runBlocking
(which is literally "block the thread until these tasks are finished") and a file read?
runBlocking has "blocking" in its name, so the idea is that it should be treated like any other blocking task.
Then nested runBlocking
should not steal tasks from the outer one.
Do you also have an issue with the fact that a Dispatchers.Default thread parks when connecting to the Internet or reading a file?
No, not really, it's something beyond the control of the library.
Let me describe an issue, which we've faced in IJ.
Consider a platform API interface which is supposed to be implemented in 3rd-party plugins:
interface CoolExtension {
fun computeStuff(): Any
}
The platform calls computeStuff()
on a thread from a global thread pool, unrelated to coroutines.
Now, the client decides to use coroutines:
class MyCoolExtension : CoolExtension {
override fun computeStuff(): Any {
return runBlocking {
42
}
}
}
After some time, the platform API evolves, and a suspending entry point is added:
interface CoolExtension {
suspend fun computeStuffS(): Any = computeStuff()
@Deprecated(...)
fun computeStuff(): Any = error("Implement me or computeStuffS")
}
computeStuffS
is now the entry point, and it delegates to the old computeStuff
to maintain compatibility.
The platform calls computeStuffS
in Dispatchers.Default
because we want to limit the effective parallelism of the system to the CPUs. The problem: if the client switches to Dispatchers.Default
inside runBlocking
, we are risking thread starvation. We've already faced this problem several times, and at the moment the fix is to avoid using Dispatchers.Default
.
Possible solutions
- Don't run
computeStuff
onDispatchers.Default
:
interface CoolExtension {
suspend fun computeStuffS(): Any = withContext(oldGlobalThreadPoolExecutor.asCoroutineDispatcher()) {
computeStuff()
}
@Deprecated(...)
fun computeStuff(): Any = error("Implement me or computeStuffS")
}
In particular, this means we cannot guarantee that there are at most X=CPU threads are active at any given moment, which defeats the purpose of Dispatchers.Default
. Also, now we are prone to excessive thread context switches, but this could be fixed by replacing oldGlobalThreadPoolExecutor
with Dispatchers.IO.limitedParallelism(Int.MAX_VALUE)
.
-
Switch from
Dispatchers.Default
right before callingrunBlocking
, which we cannot do becauserunBlocking
is inside 3rd-party code, butrunBlocking
itself could do the switch. With this approach therunBlocking
queue would be processing without token (=outside of CPU limit), and that's why I'm considering thatrunBlocking
should instead let go of the token before parking the thread. -
(I'm open for other solutions)
First, an important thing: making runBlocking
steal tasks is not unambiguously good: it can also lead to a regression, even when there are still threads available for running tasks.
val process = launch {
delay(100)
}
launch(Dispatchers.Default) {
runBlocking {
println("Joinining the task...")
process.join()
}
println("The task is finished")
}
launch(Dispatchers.Default) {
Thread.sleep(10000) // do something for a long time
}
Imagine that a thread of Dispatchers.Default
enters runBlocking
, notices that join
can't finish yet, and executes other tasks. Unfortunately, it steals a task that takes a long time to run. In the meantime, the process was joined a long time ago, but the thread doing runBlocking
can't proceed until Thread.sleep
completes.
In a single-threaded environment, the problem of liveness is not as pronounced: it's par for the course that a single long-running task prevents progress everywhere else. Here, it's really strange that runBlocking
waits for something completely unrelated when other threads could have taken the task.
making runBlocking steal tasks is not unambiguously good
Exactly! This and #3982 are the reasons #3439 (stealing inside runBlocking
) does not work. I'm sure there are other reasons.
After reading everything in #3982, #3439, and this issue several times, I understand the proposed solutions, but I still have no idea what the problem even is, how many problems there are, or how the solutions map to the actual problems (and not simplified reproducers).
Here's one question that has a chance of helping us. Imagine a thread pool that freely spawned up to n
threads by default and was prepared to create new threads, but with an exponentially increasing delay for each additional thread. For example, 8
threads by default, 9
after a quarter of a second of all 8
being busy, 10
after half a second more, 11
after a second more, etc. Would all problems be solved if we provided an API to replace Dispatchers.Default
with such a thread pool? If not, why not?
If you think this question misses the mark entirely, could you explain the big picture with a clear breakdown of the design constraints we have to fit into? If not, I doubt I can say anything useful and you'll probably have to proceed without me.
Let me elaborate on this first: https://github.com/Kotlin/kotlinx.coroutines/issues/3983#issuecomment-1852458333
Two things to consider:
-
runBlocking
on a thread from a limited dispatcher may cause a starvation deadlock.
val d = Dispatchers.IO.limitedDispatcher(10)
withContext(d) {
runBlocking {
withContext(d) { // starvation here
...
}
}
}
-
runBlocking
is advised as a bridge to switch from blocking to coroutine world.
The problem:
We cannot run 3rd-party implementation of a blocking CoolExtension.computeStuff
on any limited dispatcher (including Dispatchers.Default
) because we have no control over the 3rd-party implementation:
- we don't know if it uses
runBlocking
. - if it does, we don't know if it switches back to
Dispatchers.Default
deeper in the call chain, e.g. by calling another platform API which does the switch toDispatchers.Default
.
In general, we have to call CoolExtension.computeStuff
on an unlimited dispatcher (potentially wasting CPU) because switching from Dispatchers.Default
to an unlimited dispatcher releases a CPU token.
How is this situation different from the user code, say, accessing the Internet? That will also block the thread that you provide it with. If the 3rd-party implementation is some arbitrary code, then either you can require that it does nothing blocking there (for example, by validating the implementation with BlockHound), or you can't expect them to be good citizens, so they will eventually eat the thread that you give them.
Re https://github.com/Kotlin/kotlinx.coroutines/issues/3983#issuecomment-1862601859
This approach would solve the starvation problem, because eventually there would be enough threads to handle all the tasks. But with this approach there will be no guarantee that the parallelism is equal to CPU count, i.e. after a bunch of threads are spawned, there will be a window where all of them would be unblocked and all of them will process the global CPU-bound queue together.
How is this situation different from the user code, say, accessing the Internet?
A blocking operation, which blocks on IO, just blocks.
- we have no control over it => we choose not to think about it, here we can only hope that the implementation would be a good citizen.
- it will be eventually unblocked, and the system will proceed.
On the other hand, runBlocking
, which switches to the same dispatcher, causes starvation, and there is no way of exiting this state.
If the 3rd-party implementation is some arbitrary code, then either you can require that it does nothing blocking there
Ok, but let's consider this:
- There is a blocking API with no constraints.
- I, as 3rd party developer, started to use
runBlocking
because that's an official advise. - Suddenly, a wild suspending API appears, and it delegates to blocking one to maintain compatibility.
It would be okay if a suspending API just appears giving me a choice to migrate top it at a convenient time, but, instead, my current implementation becomes broken because I've used runBlocking
.
This issue is about evolution of existing APIs, it's about calling the code, which already uses runBlocking
under the hood. I've provided an example of evolution here
But with this approach there will be no guarantee that the parallelism is equal to CPU count
That's another big thing I don't understand, yes. Do you actually need that guarantee?
Quoting your initial message:
In other similar scenarios (FJ's managed block), another thread is spawned to compensate for the blocked one. On JVM this is the most viable approach to this day. It's better to spawn an extra thread, which might do some other work later on or just die after timeout, and to risk OOME, than to have a starvation deadlock.
I get a strong impression that you're okay with utilizing extra threads to resolve deadlocks.
it will be eventually unblocked, and the system will proceed.
It can realistically take several seconds or more. If you're okay with spawning extra threads, this would be a good time to do that, no?
On the other hand,
runBlocking
, which switches to the same dispatcher, causes starvation, and there is no way of exiting this state.
I think I see this point, thank you.
There is a blocking API with no constraints.
And then this blocking API with no constraints starts to be unconditionally run on Dispatchers.Default
?
Here's another possible way to perform this migration:
interface CoolExtension {
fun computeStuff(): Any
suspend fun computeStuffS(): Any {
throw SuspendImplNotProvided()
}
}
internal class SuspendImplNotProvided(): Exception()
suspend fun CoolExtension.doComputeStuff() {
try {
withContext(Dispatchers.Default) {
computeStuffS()
}
} catch (e: SuspendImplNotProvided) {
withContext(Dispatchers.IO) {
computeStuff() // can potentially block, so using the IO dispatcher
}
}
}
But with this approach there will be no guarantee that the parallelism is equal to CPU count
That's another big thing I don't understand, yes. Do you actually need that guarantee?
Well, yes! Isn't that the whole thing Dispatchers.Default
is about? I mean without this guarantee it would be enough to have a single limited dispatcher without separating Dispatchers.Default
and Dispatchers.IO
.
I get a strong impression that you're okay with utilizing extra threads to resolve deadlocks.
Yes, extra threads (and risking OOM) are better than total deadlock.
it will be eventually unblocked, and the system will proceed.
It can realistically take several seconds or more. If you're okay with spawning extra threads, this would be a good time to do that, no?
Correct, it would be a good time. But, again, we don't have control over the implementation of JVM IO.
There is a blocking API with no constraints.
And then this blocking API with no constraints starts to be unconditionally run on Dispatchers.Default?
This is what we'd want: run everything on Dispatchers.Default
occasionally switching to IO or EDT (event dispatch thread, or UI thread) when needed. Running everything on Dispatchers.Default
yields least amount of fighting over CPUs between threads. But the problem is not even Dispatchers.Default
, the starvation can happen with any limited dispatcher!
Here's another possible way to perform this migration:
So, basically, this:
interface CoolExtension {
fun computeStuff(): Any
suspend fun computeStuffS(): Any {
withContext(Dispatchers.IO) { // can potentially block, so using the IO dispatcher
computeStuff()
}
}
}
suspend fun CoolExtension.doComputeStuff() {
withContext(Dispatchers.Default) {
computeStuffS()
}
}
Listed as Possible Solution 1
But, again, we don't have control over the implementation of JVM IO.
Exactly. This is why, even if we change what happens in kotlinx-coroutines, you still won't have control over what happens in computeStuff
. It's basically a black box that may or may not behave nicely with your threads. One of the reasons may be the use of runBlocking
. The more general solution seems to be to allocate new threads in dire conditions.
Listed as Possible Solution 1
Oh, ok, I misunderstood that point, then. Yes, I like this solution the most: until the author of CoolExtension
marked their code as async-ready and eliminated the blocking behavior, wasting a CPU token on various blocking tasks will mean heavy underutilization of CPU, occasionally to the point of starvation.
In particular, this means we cannot guarantee that there are at most X=CPU threads are active at any given moment, which defeats the purpose of
Dispatchers.Default
.
and
Running everything on
Dispatchers.Default
yields least amount of fighting over CPUs between threads.
Looks like the crucial point. Do I understand correctly that you are prepared to give up some of the parallelism by allowing some of the threads with CPU tokens to block in exchange for improving the ratio of useful work over the total work? I can imagine wanting to do this if the goal is to reduce energy consumption, for example.
It's basically a black box that may or may not behave nicely with your threads. One of the reasons may be the use of runBlocking.
With regular IO, where we don't have control over, this indeed is a black box. I'd argue that runBlocking
case would be a black box if it didn't cause a starvation, and when considering that it does, we have a point of introspection into the said box, which makes it non-black.
The more general solution seems to be to allocate new threads in dire conditions.
I don't really understand the proposition. How would you detect when to spawn a new thread?
Do I understand correctly that you are prepared to give up some of the parallelism by allowing some of the threads with CPU tokens to block in exchange for improving the ratio of useful work over the total work?
Not really. I argue that some situations can be detected, e.g. runBlocking
which is about to park. Once it's parked, it's parked, the thread is not even considered for scheduling purposes by the system scheduler => there is no sense in holding onto the CPU token => coroutine scheduler should give the token to another thread (and spawn one if there no other thread). In this sense I don't really want to give up some parallelism, I want threads to do maximum amount of work, and if some thread blocks, then another thread should continue doing the work, so the total number of active threads under load is equals to number of CPUs.
I believe there is only one user-callable function in the library which blocks: runBlocking
, please correct me if I'm wrong. This means that if it would work like I propose, it can be safely recommended to write the code without unexpected deadlocks.
I hate to bring it up here, but this is where Loom is expected to shine. Basically, the coroutine library implementation on top of Loom should spawn a new virtual thread per coroutine and use blocking JVM API calls to park/join, effectively delegating the scheduling to VM. I wonder if any work was done in this direction. This would cover IO being a black box: the VM will detect it and unmount the virtual thread before mounting another.
@dovchinnikov, does https://github.com/Kotlin/kotlinx.coroutines/pull/4084#issuecomment-2112234723 mean that there is no more intention to pursue this approach?
We have another idea how to deal with thread starvation, though it may hit the performance in some cases. I'll publish another MR when it's ready.