pekko icon indicating copy to clipboard operation
pekko copied to clipboard

BatchingExecutor BlockableBatch.run: "requirement failed" under concurrent task invocations

Open jxtps opened this issue 11 months ago • 3 comments

I'm seeing the following occasional stack trace in production:

java.lang.IllegalArgumentException: requirement failed
	at scala.Predef$.require(Predef.scala:324)
	at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:105)
	at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59)
	at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:57)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
	at java.base/java.util.concurrent.ForkJoinPool.helpJoin(ForkJoinPool.java:2076)
	at java.base/java.util.concurrent.ForkJoinTask.awaitDone(ForkJoinTask.java:423)
	at java.base/java.util.concurrent.ForkJoinTask.invokeAll(ForkJoinTask.java:750)
        ...

The ... is my client code, and it does ForkJoinTask.invokeAll(tasks) with an array of RecursiveActions.

This happens during heavily parallelized calls to my client code. Think a bunch of concurrent web requests all needing to issue parallel sub-tasks.

As this is happening based on live traffic I don't have a neatly packaged up reproducible test case immediately available.

Does pekko have multi-threaded stress tests with tasks being invoked in parallel?

This is in pekko-actor 1.0.3 for Scala 2.13, pekko-actor_2.13-1.0.3.jar

jxtps avatar Jan 13 '25 22:01 jxtps

Other contributors may have a different viewpoint on this but I would not really think of Pekko dispatchers as being for general use - that they are more designed for Pekko specific internal usage.

Are your tasks doing Pekko based work?

Were tasks lost when the requirement exception happened or were they retried?

Have you considered using a Scala ExecutionContext or a Java ForkJoinPool directly? Or a framework like ZIO, cats-effect or another general purpose async job framework?

pjfanning avatar Jan 14 '25 15:01 pjfanning

Yes, I have had to switch to using a custom ForkJoinPool.

But regardless, if you look at the code where this happens ( https://github.com/apache/pekko/blob/main/actor/src/main/scala/org/apache/pekko/dispatch/BatchingExecutor.scala#L104 ):

    override final def run(): Unit = {
      require(_tasksLocal.get eq null)
      _tasksLocal.set(this) // Install ourselves as the current batch
      val firstInvocation = _blockContext.get eq null
      if (firstInvocation) _blockContext.set(BlockContext.current)
      BlockContext.withBlockContext(this) {
        try processBatch(this)
        catch {
          case t: Throwable =>
            resubmitUnbatched()
            throw t
        } finally {
          _tasksLocal.remove()
          if (firstInvocation) _blockContext.remove()
        }
      }
    }

What happens is that require(_tasksLocal.get eq null) fails, which makes it look like there's a race condition happening? And regardless of use, that should be made to be not possible, right?

Or maybe I'm misunderstanding how this is supposed to work, but my code "just" does ForkJoinPool.invokeAll, which should always be legal, no?

jxtps avatar Jan 14 '25 17:01 jxtps

Do you have a reproducer for this? But the current implementation seems its only valid for internal usage.

He-Pin avatar Jan 14 '25 18:01 He-Pin