monix
monix copied to clipboard
Dead lock with Task.runSyncUnsafe()
Prerequisites
Monix 3.2.2
How to reproduce
First example: runToFuture executes a blocking operation on the main thread, thus locking the execution forever.
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.subjects.ConcurrentSubject
import scala.io.StdIn
object MonixLockMainThread extends App {
val subj = ConcurrentSubject.publish[String](Scheduler.global)
implicit val scheduler = Scheduler.forkJoin(2, 2, "test") // or .fixedPool("test", 2)
val getTask = subj.headL
val waitTask = Task {
Thread.sleep(500)
println("Getting task value") // Printed only once
val value = getTask.runSyncUnsafe()
println(value) // Never gets printed
}
waitTask.runToFuture // Locks main thread forever here
waitTask.runToFuture
subj.onNext("123")
println("Waiting") // Also never gets printed
StdIn.readLine()
sys.exit(0)
}
(Output: Getting task value)
Second example: main thread lock is bypassed with evalAsync, but the dead lock is happening anyway. concurrent.blocking()
gets ignored and the scheduler doesn't spawn additional threads, which i presume is the expected behaviour: https://stackoverflow.com/questions/29068064/scala-concurrent-blocking-what-does-it-actually-do#29069021
object MonixDeadLock extends App {
val subj = ConcurrentSubject.publish[String](Scheduler.global)
implicit val scheduler = Scheduler.forkJoin(2, 2, "test") // or .fixedPool("test", 2)
val getTask = subj.headL
val waitTask = Task.evalAsync {
Thread.sleep(500)
println("Getting task value")
val value = concurrent.blocking(getTask.runSyncUnsafe()) // Ignores concurrent.blocking
println(value) // Never gets printed
}
waitTask.runToFuture
waitTask.runToFuture
subj.onNext("123")
println("Waiting")
StdIn.readLine()
sys.exit(0)
}
(Output: Waiting, Getting task value, Getting task value)
The first example needs 3 changes:
object MonixLockMainThread extends App {
val subj = ConcurrentSubject.publish[String](Scheduler.global)
implicit val scheduler = Scheduler.forkJoin(2, 3, "test") // (1)
val getTask = subj.headL
val waitTask = Task.evalAsync { // (2)
Thread.sleep(500)
println("Getting task value")
val value = getTask.runSyncUnsafe()
println(value)
}
waitTask.runToFuture
waitTask.runToFuture
Thread.sleep(1000) // (3)
subj.onNext("123")
println("Waiting")
StdIn.readLine()
sys.exit(0)
}
- You need to increase the maximum number of threads, because you execute 2 blocking operations (runSyncUnsafe);
- By default, runToFuture is performed in the original thread (in this example main thread). To start execution in forkJoin pool, you need to use
executeAsync
orTask.evalAsync
. - You perform onNext before subscribing. Therefore
headL
endlessly waiting for a new event.
The second example needs 2 changes:
object MonixDeadLock extends App {
val subj = ConcurrentSubject.publish[String](Scheduler.global)
implicit val scheduler = Scheduler.forkJoin(2, 3, "test") // (1)
val getTask = subj.headL
val waitTask = Task.evalAsync {
Thread.sleep(500)
println("Getting task value")
val value = concurrent.blocking(getTask.runSyncUnsafe())
println(value)
}
waitTask.runToFuture
waitTask.runToFuture
Thread.sleep(1000) // (2)
subj.onNext("123")
println("Waiting")
StdIn.readLine()
sys.exit(0)
}
- The maximum number of threads must be greater than the initial. Otherwise, the pool will not spawn new threads;
- Same reason as in the first example.
By default, runToFuture is performed in the original thread
- runToFuture is a direct replacement for the deprecated runAsync method
- In the runToFuture scaladoc:
Triggers the asynchronous execution
So thats not the expected behaviour. (or it should be documented better)
The maximum number of threads must be greater than the initial. Otherwise, the pool will not spawn new threads
Pool should spawn new threads, because all used threads marked with the BlockingContext. I should be able to spawn any number of such blocking tasks, but they should be completed without a dead-lock. Thats the intended use of concurrent.blocking
blocking is meant to act as a hint to the ExecutionContext that the contained code is blocking and could lead to thread starvation. This will give the thread pool a chance to spawn new threads in order to prevent starvation.
So thats not the expected behaviour. (or it should be documented better)
You're right, I will update the doc, it comes up from time to time. I think the current behavior is intended, it can be a nice optimization. Although we might have to change it to insert async boundary if a local context propagation is enabled.
Pool should spawn new threads, because all used threads marked with the BlockingContext. I should be able to spawn any number of such blocking tasks, but they should be completed without a dead-lock. Thats the intended use of concurrent.blocking
I don't think that's the case, see ExecutionContext.global scaladoc:
The maxExtraThreads is the maximum number of extra threads to have at any given time to evade deadlock, see scala.concurrent.BlockContext.
If you're doing a blocking operation in Task
, I'd suggest to use executeOn
on a different scheduler, like Scheduler.io()
which doesn't have an upper limit
I don't think that's the case, see ExecutionContext.global scaladoc:
The maxExtraThreads is the maximum number of extra threads to have at any given time to evade deadlock, see scala.concurrent.BlockContext.
But...
scala.concurrent.context.maxExtraThreads = defaults to "256"
And in the example there is zero extra threads spawned.
I don't think that's the case, see ExecutionContext.global scaladoc:
The maxExtraThreads is the maximum number of extra threads to have at any given time to evade deadlock, see scala.concurrent.BlockContext.
But...
scala.concurrent.context.maxExtraThreads = defaults to "256"
And in the example there is zero extra threads spawned.
Yes, it's expected - equivalent of maxExtraThreads
in forkJoin
builder is maxThreads
parameter which is set to 2
in your examples.
What's your expectation about maxThreads
parameter?
Yes, it's expected - equivalent of
maxExtraThreads
inforkJoin
builder ismaxThreads
parameter which is set to2
in your examples.What's your expectation about
maxThreads
parameter?
Hmm, ok, looks reasonable 👍. After a second investigation i found out that our problem apparently was with the default akka dispatcher, which don't spawn extra threads, and both Scheduler.global and forkJoin with maxThreads>parallelism works fine.