scala-parallel-collections
scala-parallel-collections copied to clipboard
TaskSupport overridden in ForkJoinTasks
The wrong forkJoinPool is used in the following example:
implicit val someForkJoinPoolExecutionContext = ...
val otherForkJoinPoolEC = ...
val someCollection = ...
def foo() {
// Future runs in someForkJoinPoolExecutionContext
Future{
val parCollection = someCollection.par
parCollection.tasksupport = new ExecutionContextTaskSupport(otherForkJoinPoolEC)
// this is executed in the someForkJoinPoolExecutionContext rather than otherForkJoinPoolEC
parCollection.foreach(...)
}
}
In Tasks.scala, we see at line 420: https://github.com/scala/scala/blob/5cb3d4ec14488ce2fc5a1cc8ebdd12845859c57d/src/library/scala/collection/parallel/Tasks.scala#L420
if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) {
Which overrides the given tasksupport, preferring the executioncontext of the currentThread.
Imported From: https://issues.scala-lang.org/browse/SI-9727?orig=1 Reporter: Sietse Au (@stau) Affected Versions: 2.12.0-M3 See scala/bug#6998
@soc said (edited on Apr 5, 2016 1:43:02 PM UTC): Thanks for the report, Sietse!
Only slightly related to this, I think the mutable bits (mostly taskSupport) need to be cleaned up and removed, and a better immutable way should be provided.
@szeiger said: Assigning to M5 because it changes semantics. Maybe out of scope for 2.12 anyway?
@soc said: Don't think it is out of scope, even if we change the syntax later, the tests written for this issue will still be useful.
When porting from scala 2.11 to 2.12.12, we found something similar. Here is a simple piece of code to reproduce the problem.
val nbThreads = 2 // less than the number of cores
val stuff = (0 until 100).par
stuff.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(nbThreads))
val simultaneousExecutions = new AtomicInteger(0)
stuff.foreach { userIndex =>
val sim = simultaneousExecutions.incrementAndGet()
println(sim)
(0 to 1000000).par.sum // Doing a parallel collection operation here seems to change 'stuff' collection tasksupport
Thread.sleep(50)
simultaneousExecutions.decrementAndGet()
}
is the problem also reproducible on Scala 2.13.5?
https://github.com/scala/bug/issues/10577
scala/bug#11036
is the problem also reproducible on Scala 2.13.5?
Yes it does..
https://scastie.scala-lang.org/I9WfbV6qS5WRR7zrmrIYHA
The umbrella issue for TaskSupport ergonomics might be https://github.com/scala/scala-parallel-collections/issues/152
Probably @SethTisue intended to close this ticket with other module-related tickets?
I tried to follow the conclusions from a few years ago. For par, using the ForkJoinPool ctor with maximumPoolSize and minimumRunnable isn't sufficient to limit the "outer loop" to 2 submitted tasks and max 2 threads.
A workaround is to call sum inside Future (using default context aka common pool). (When the inner par operation is run, on the common pool I assume, is that run on the same thread? is it a fork or submit?)
Here is the updated snippet. The printlns make it easier to see order of operations. In the absence of blocking, the outer loop happily submits tasks, and the FJP may spawn an extra thread or 2 to ensure parallelism. It's crucial to distinguish tasks from the threads. I assume the expectation or goal is to run 2 tasks on 2 threads, with the next task (or foreach iteration) running when a thread is available.
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.parallel.ForkJoinTaskSupport
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.TimeUnit
import scala.collection.parallel.CollectionConverters._
import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
object Test extends App {
def t = Thread.currentThread
def count = Thread.activeCount()
def custom = {
val nbThreads = 2 // less than the number of cores
val asyncly = true
val limited = false
if (!limited) new ForkJoinPool(nbThreads)
else new ForkJoinPool(
nbThreads,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
/*handler=*/ null,
/*asyncMode=*/ asyncly,
/*corePoolSize=*/ nbThreads,
/*maximumPoolSize=*/ nbThreads, // no excess, default is + 256
/*mininumRunnable=*/ 0, // allow all blocked, default is 1
/*saturate=*/ null,
/*keepAliveTime=*/ 60L,
TimeUnit.SECONDS,
)
}
val stuff = (0 until 100).par
stuff.tasksupport = new ForkJoinTaskSupport(custom)
val commonsupport = new ForkJoinTaskSupport(ForkJoinPool.commonPool)
val simultaneousExecutions = new AtomicInteger(0)
val total = new AtomicInteger(0)
stuff.foreach { userIndex =>
val sim = simultaneousExecutions.incrementAndGet()
val loop = total.incrementAndGet()
println(s"$t: of $count: loop $loop ($sim): ${stuff.tasksupport.environment}")
//(0 to 1000000).par.sum // Doing a parallel collection operation here seems to change 'stuff' collection tasksupport
def sum = Future {
val p = (0 to 10).par
p.tasksupport = commonsupport
p.sum
}
println(s"$t: of $count: loop $loop ($sim): sum: ${Await.result(sum, Duration.Inf)}")
//Thread.sleep(50L)
simultaneousExecutions.decrementAndGet()
}
}
The umbrella issue for TaskSupport ergonomics might be #152
Probably @SethTisue intended to close this ticket with other module-related tickets?
I tried to follow the conclusions from a few years ago. For
par, using theForkJoinPoolctor withmaximumPoolSizeandminimumRunnableisn't sufficient to limit the "outer loop" to 2 submitted tasks and max 2 threads.A workaround is to call
suminsideFuture(using default context aka common pool). (When the inner par operation is run, on the common pool I assume, is that run on the same thread? is it a fork or submit?)Here is the updated snippet. The printlns make it easier to see order of operations. In the absence of blocking, the outer loop happily submits tasks, and the FJP may spawn an extra thread or 2 to ensure parallelism. It's crucial to distinguish tasks from the threads. I assume the expectation or goal is to run 2 tasks on 2 threads, with the next task (or foreach iteration) running when a thread is available.
This is an interesting solution but not one that is possible given the current code I use. The inner loop is not directly accessible.
It would require to change all method signatures and objects to add commonsupport parameter. It does not make sense since the inner code might run parallel operations anywhere.
My workaround is kind of the opposite of your code sample.. I execute the top level loop with futures on a fixed execution pool and leave the inner code do whatever it wants.
I agree your solution is best. The promise of .par was that I can sprinkle it around and benefit.