monix icon indicating copy to clipboard operation
monix copied to clipboard

Dead lock with Task.runSyncUnsafe()

Open Karasiq opened this issue 4 years ago • 6 comments

Prerequisites

Monix 3.2.2

How to reproduce

First example: runToFuture executes a blocking operation on the main thread, thus locking the execution forever.

import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.subjects.ConcurrentSubject

import scala.io.StdIn

object MonixLockMainThread extends App {
  val subj               = ConcurrentSubject.publish[String](Scheduler.global)
  implicit val scheduler = Scheduler.forkJoin(2, 2, "test") // or .fixedPool("test", 2)
  val getTask            = subj.headL
  val waitTask = Task {
    Thread.sleep(500)
    println("Getting task value") // Printed only once
    val value = getTask.runSyncUnsafe()
    println(value) // Never gets printed
  }
  waitTask.runToFuture // Locks main thread forever here
  waitTask.runToFuture
  subj.onNext("123")
  println("Waiting") // Also never gets printed
  StdIn.readLine()
  sys.exit(0)
}

(Output: Getting task value)

Second example: main thread lock is bypassed with evalAsync, but the dead lock is happening anyway. concurrent.blocking() gets ignored and the scheduler doesn't spawn additional threads, which i presume is the expected behaviour: https://stackoverflow.com/questions/29068064/scala-concurrent-blocking-what-does-it-actually-do#29069021

object MonixDeadLock extends App {
  val subj               = ConcurrentSubject.publish[String](Scheduler.global)
  implicit val scheduler = Scheduler.forkJoin(2, 2, "test") // or .fixedPool("test", 2)
  val getTask            = subj.headL
  val waitTask = Task.evalAsync {
    Thread.sleep(500)
    println("Getting task value")
    val value = concurrent.blocking(getTask.runSyncUnsafe()) // Ignores concurrent.blocking
    println(value) // Never gets printed
  }
  waitTask.runToFuture
  waitTask.runToFuture
  subj.onNext("123")
  println("Waiting")
  StdIn.readLine()
  sys.exit(0)
}

(Output: Waiting, Getting task value, Getting task value)

Karasiq avatar Jun 11 '20 11:06 Karasiq

The first example needs 3 changes:

object MonixLockMainThread extends App {
  val subj               = ConcurrentSubject.publish[String](Scheduler.global)
  implicit val scheduler = Scheduler.forkJoin(2, 3, "test") // (1)
  val getTask            = subj.headL
  val waitTask = Task.evalAsync { // (2)
    Thread.sleep(500)
    println("Getting task value")
    val value = getTask.runSyncUnsafe()
    println(value)
  }
  waitTask.runToFuture
  waitTask.runToFuture
  Thread.sleep(1000) // (3)
  subj.onNext("123")
  println("Waiting")
  StdIn.readLine()
  sys.exit(0)
}
  1. You need to increase the maximum number of threads, because you execute 2 blocking operations (runSyncUnsafe);
  2. By default, runToFuture is performed in the original thread (in this example main thread). To start execution in forkJoin pool, you need to use executeAsync or Task.evalAsync.
  3. You perform onNext before subscribing. Therefore headL endlessly waiting for a new event.

The second example needs 2 changes:

object MonixDeadLock extends App {
  val subj               = ConcurrentSubject.publish[String](Scheduler.global)
  implicit val scheduler = Scheduler.forkJoin(2, 3, "test") // (1)
  val getTask            = subj.headL
  val waitTask = Task.evalAsync {
    Thread.sleep(500)
    println("Getting task value")
    val value = concurrent.blocking(getTask.runSyncUnsafe())
    println(value)
  }
  waitTask.runToFuture
  waitTask.runToFuture
  Thread.sleep(1000) // (2)
  subj.onNext("123")
  println("Waiting")
  StdIn.readLine()
  sys.exit(0)
}
  1. The maximum number of threads must be greater than the initial. Otherwise, the pool will not spawn new threads;
  2. Same reason as in the first example.

hegelwin avatar Jun 16 '20 21:06 hegelwin

By default, runToFuture is performed in the original thread

  1. runToFuture is a direct replacement for the deprecated runAsync method
  2. In the runToFuture scaladoc:

Triggers the asynchronous execution

So thats not the expected behaviour. (or it should be documented better)

The maximum number of threads must be greater than the initial. Otherwise, the pool will not spawn new threads

Pool should spawn new threads, because all used threads marked with the BlockingContext. I should be able to spawn any number of such blocking tasks, but they should be completed without a dead-lock. Thats the intended use of concurrent.blocking

blocking is meant to act as a hint to the ExecutionContext that the contained code is blocking and could lead to thread starvation. This will give the thread pool a chance to spawn new threads in order to prevent starvation.

Karasiq avatar Jun 19 '20 09:06 Karasiq

So thats not the expected behaviour. (or it should be documented better)

You're right, I will update the doc, it comes up from time to time. I think the current behavior is intended, it can be a nice optimization. Although we might have to change it to insert async boundary if a local context propagation is enabled.

Pool should spawn new threads, because all used threads marked with the BlockingContext. I should be able to spawn any number of such blocking tasks, but they should be completed without a dead-lock. Thats the intended use of concurrent.blocking

I don't think that's the case, see ExecutionContext.global scaladoc:

The maxExtraThreads is the maximum number of extra threads to have at any given time to evade deadlock, see scala.concurrent.BlockContext.

If you're doing a blocking operation in Task, I'd suggest to use executeOn on a different scheduler, like Scheduler.io() which doesn't have an upper limit

Avasil avatar Jun 19 '20 10:06 Avasil

I don't think that's the case, see ExecutionContext.global scaladoc:

The maxExtraThreads is the maximum number of extra threads to have at any given time to evade deadlock, see scala.concurrent.BlockContext.

But...

scala.concurrent.context.maxExtraThreads = defaults to "256"

And in the example there is zero extra threads spawned.

Karasiq avatar Jun 22 '20 11:06 Karasiq

I don't think that's the case, see ExecutionContext.global scaladoc:

The maxExtraThreads is the maximum number of extra threads to have at any given time to evade deadlock, see scala.concurrent.BlockContext.

But...

scala.concurrent.context.maxExtraThreads = defaults to "256"

And in the example there is zero extra threads spawned.

Yes, it's expected - equivalent of maxExtraThreads in forkJoin builder is maxThreads parameter which is set to 2 in your examples.

What's your expectation about maxThreads parameter?

Avasil avatar Jun 22 '20 11:06 Avasil

Yes, it's expected - equivalent of maxExtraThreads in forkJoin builder is maxThreads parameter which is set to 2 in your examples.

What's your expectation about maxThreads parameter?

Hmm, ok, looks reasonable 👍. After a second investigation i found out that our problem apparently was with the default akka dispatcher, which don't spawn extra threads, and both Scheduler.global and forkJoin with maxThreads>parallelism works fine.

Karasiq avatar Jun 22 '20 13:06 Karasiq