scala-parallel-collections icon indicating copy to clipboard operation
scala-parallel-collections copied to clipboard

TaskSupport overridden in ForkJoinTasks

Open scabug opened this issue 9 years ago • 12 comments

The wrong forkJoinPool is used in the following example:

implicit val someForkJoinPoolExecutionContext = ...
val otherForkJoinPoolEC = ...
val someCollection = ...
def foo() {
   // Future runs in someForkJoinPoolExecutionContext
   Future{
       val parCollection = someCollection.par
       parCollection.tasksupport = new ExecutionContextTaskSupport(otherForkJoinPoolEC)
       // this is executed in the someForkJoinPoolExecutionContext rather than otherForkJoinPoolEC
       parCollection.foreach(...)
    }
} 

In Tasks.scala, we see at line 420: https://github.com/scala/scala/blob/5cb3d4ec14488ce2fc5a1cc8ebdd12845859c57d/src/library/scala/collection/parallel/Tasks.scala#L420

if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) {

Which overrides the given tasksupport, preferring the executioncontext of the currentThread.

scabug avatar Mar 30 '16 19:03 scabug

Imported From: https://issues.scala-lang.org/browse/SI-9727?orig=1 Reporter: Sietse Au (@stau) Affected Versions: 2.12.0-M3 See scala/bug#6998

scabug avatar Mar 30 '16 19:03 scabug

@soc said (edited on Apr 5, 2016 1:43:02 PM UTC): Thanks for the report, Sietse!

Only slightly related to this, I think the mutable bits (mostly taskSupport) need to be cleaned up and removed, and a better immutable way should be provided.

scabug avatar Apr 05 '16 13:04 scabug

@szeiger said: Assigning to M5 because it changes semantics. Maybe out of scope for 2.12 anyway?

scabug avatar Apr 06 '16 16:04 scabug

@soc said: Don't think it is out of scope, even if we change the syntax later, the tests written for this issue will still be useful.

scabug avatar Apr 07 '16 14:04 scabug

When porting from scala 2.11 to 2.12.12, we found something similar. Here is a simple piece of code to reproduce the problem.

        val nbThreads = 2 // less than the number of cores
        val stuff = (0 until 100).par
        stuff.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(nbThreads))

        val simultaneousExecutions = new AtomicInteger(0)
        stuff.foreach { userIndex =>
          val sim = simultaneousExecutions.incrementAndGet()
          println(sim)
          (0 to 1000000).par.sum // Doing a parallel collection operation here seems to change 'stuff' collection tasksupport
          Thread.sleep(50)
          simultaneousExecutions.decrementAndGet()
        }

michellemay avatar Feb 26 '21 12:02 michellemay

is the problem also reproducible on Scala 2.13.5?

SethTisue avatar Feb 26 '21 14:02 SethTisue

https://github.com/scala/bug/issues/10577

som-snytt avatar Feb 26 '21 16:02 som-snytt

scala/bug#11036

michellemay avatar Feb 26 '21 16:02 michellemay

is the problem also reproducible on Scala 2.13.5?

Yes it does..
https://scastie.scala-lang.org/I9WfbV6qS5WRR7zrmrIYHA

michellemay avatar Feb 26 '21 16:02 michellemay

The umbrella issue for TaskSupport ergonomics might be https://github.com/scala/scala-parallel-collections/issues/152

Probably @SethTisue intended to close this ticket with other module-related tickets?

I tried to follow the conclusions from a few years ago. For par, using the ForkJoinPool ctor with maximumPoolSize and minimumRunnable isn't sufficient to limit the "outer loop" to 2 submitted tasks and max 2 threads.

A workaround is to call sum inside Future (using default context aka common pool). (When the inner par operation is run, on the common pool I assume, is that run on the same thread? is it a fork or submit?)

Here is the updated snippet. The printlns make it easier to see order of operations. In the absence of blocking, the outer loop happily submits tasks, and the FJP may spawn an extra thread or 2 to ensure parallelism. It's crucial to distinguish tasks from the threads. I assume the expectation or goal is to run 2 tasks on 2 threads, with the next task (or foreach iteration) running when a thread is available.

import java.util.concurrent.atomic.AtomicInteger
import scala.collection.parallel.ForkJoinTaskSupport
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.TimeUnit
import scala.collection.parallel.CollectionConverters._

import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration

object Test extends App {
  def t = Thread.currentThread
  def count = Thread.activeCount()
  def custom = {
    val nbThreads = 2 // less than the number of cores
    val asyncly = true
    val limited = false
    if (!limited) new ForkJoinPool(nbThreads)
    else new ForkJoinPool(
      nbThreads,
      ForkJoinPool.defaultForkJoinWorkerThreadFactory,
      /*handler=*/ null,
      /*asyncMode=*/ asyncly,
      /*corePoolSize=*/ nbThreads,
      /*maximumPoolSize=*/ nbThreads, // no excess, default is + 256
      /*mininumRunnable=*/ 0, // allow all blocked, default is 1
      /*saturate=*/ null,
      /*keepAliveTime=*/ 60L,
      TimeUnit.SECONDS,
    )
  }
  val stuff = (0 until 100).par
  stuff.tasksupport = new ForkJoinTaskSupport(custom)
  val commonsupport = new ForkJoinTaskSupport(ForkJoinPool.commonPool)

  val simultaneousExecutions = new AtomicInteger(0)
  val total = new AtomicInteger(0)
  stuff.foreach { userIndex =>
    val sim = simultaneousExecutions.incrementAndGet()
    val loop = total.incrementAndGet()
    println(s"$t: of $count: loop $loop ($sim): ${stuff.tasksupport.environment}")
    //(0 to 1000000).par.sum // Doing a parallel collection operation here seems to change 'stuff' collection tasksupport
    def sum = Future {
      val p = (0 to 10).par
      p.tasksupport = commonsupport
      p.sum
    }
    println(s"$t: of $count: loop $loop ($sim): sum: ${Await.result(sum, Duration.Inf)}")
    //Thread.sleep(50L)
    simultaneousExecutions.decrementAndGet()
  }
}

som-snytt avatar Feb 28 '21 16:02 som-snytt

The umbrella issue for TaskSupport ergonomics might be #152

Probably @SethTisue intended to close this ticket with other module-related tickets?

I tried to follow the conclusions from a few years ago. For par, using the ForkJoinPool ctor with maximumPoolSize and minimumRunnable isn't sufficient to limit the "outer loop" to 2 submitted tasks and max 2 threads.

A workaround is to call sum inside Future (using default context aka common pool). (When the inner par operation is run, on the common pool I assume, is that run on the same thread? is it a fork or submit?)

Here is the updated snippet. The printlns make it easier to see order of operations. In the absence of blocking, the outer loop happily submits tasks, and the FJP may spawn an extra thread or 2 to ensure parallelism. It's crucial to distinguish tasks from the threads. I assume the expectation or goal is to run 2 tasks on 2 threads, with the next task (or foreach iteration) running when a thread is available.

This is an interesting solution but not one that is possible given the current code I use. The inner loop is not directly accessible. It would require to change all method signatures and objects to add commonsupport parameter. It does not make sense since the inner code might run parallel operations anywhere.

My workaround is kind of the opposite of your code sample.. I execute the top level loop with futures on a fixed execution pool and leave the inner code do whatever it wants.

michellemay avatar Mar 01 '21 12:03 michellemay

I agree your solution is best. The promise of .par was that I can sprinkle it around and benefit.

som-snytt avatar Mar 01 '21 13:03 som-snytt