parallel icon indicating copy to clipboard operation
parallel copied to clipboard

The worker crashed / Loop exceptionally stopped without resolving the promise

Open spiritix opened this issue 3 years ago • 9 comments

I'm continuously getting the following errors:

The worker crashed - amphp/parallel/lib/Worker/TaskWorker.php:111 Loop exceptionally stopped without resolving the promise - amphp/amp/lib/functions.php:231 Worker in pool exited unexpectedly with code -1 - amphp/parallel/lib/Worker/DefaultPool.php:254

Unfortunately I can't reproduce these issues, sometimes it works, sometimes it doesn't. Seems to be related with current RAM and CPU usage.

How can I further debug these issues? I don't see any further error messages from the workers themselves.

spiritix avatar Mar 10 '21 12:03 spiritix

Do you have PHP error logs enabled? Do these reveal anything?

kelunik avatar Mar 10 '21 21:03 kelunik

Yeah, I'm actually even using Papertrail to collect all logs. Nothing relevant in there.

spiritix avatar Mar 18 '21 15:03 spiritix

Hi @kelunik , I'm facing the same issue too, on $pool->shutdown()

I don't have neither parallel nor pthreads extensions so WorkerProcess is being used

class AmpCoroutinePerformer
{
...
    public function perform(array $tasks, int $maxCoroutinesCount): array
    {
        $pool = new DefaultPool($maxCoroutinesCount, $this->workerFactory);
        $results = [];
        Loop::run(
            function () use ($pool, $tasks, &$results) {
                // Creates a context using Process, or if ext-parallel is installed, Parallel.
                $coroutines = [];

                foreach ($tasks as $taskKey => $task) {
                    $coroutines[$taskKey] = call(
                        function () use ($pool, $task, $taskKey) {
                            $result = yield $pool->enqueue($task);
                            $this->logDebug(sprintf('A result is obtained for task %s', $taskKey));
                            return $result;
                        }
                    );
                }

                $results = yield all($coroutines);

                try {
                    return yield $pool->shutdown();
                } catch (Throwable $t) {
                    $data['trace_string'] = $t instanceof TaskFailureThrowable
                        ? $t->getOriginalTraceAsString()
                        : $t->getTraceAsString();
                    $this->logDebug(
                        sprintf(
                            'An "%s" throwable has been caught on workers shutdown. Message: %s',
                            get_class($t),
                            $t->getMessage()
                        ),
                        $data
                    );
                    return new Promise(
                        function () use ($pool) {
                            $this->logDebug('Performing kill ()...');
                            $pool->kill();
                        }
                    );
                }
            }
        );
        return $results;
    }

}

class App {
   public function start()
   {
   ...
        /** @var FirstLevel[] $tasks */
         $results = $tasksPerformer->perform($tasks, $maxTasksCount);
         //no error on the loop shutdown without second-level loop usage
         //error exists on the loop shutdown when using second-level loop for subtask
   ...
   }
}

class FirstLevel implements \Amp\Parallel\Worker\Task
{
  public function run(Environment $environment)
  {
  ...
  try {
     //return $endResult; //returns results, without subtasks, no error on shutdown
        /** @var SecondLevel[] $tasks */
         $results = $tasksPerformer->perform($tasks, $maxTasksCount);
         $endResult = $this->mergeResults($results); //here we use second-level loop (run subtasks)
         return $endResult; //returns results, with subtasks, error on shutdown in the first-level loop
     } catch (\Throwable $t) {
            Helper::flattenThrowableBacktrace($t);
            return $t;
     }
  ...
  }
}

class SecondLevel implements \Amp\Parallel\Worker\Task
{
  public function run(Environment $environment)
  {
  ...
  try {
     return $endResult //returns results, without subtasks, no error on shutdown
     } catch (\Throwable $t) {
            Helper::flattenThrowableBacktrace($t);
            return $t;
     }
  ...
  }
}

Let's say an application has some method App::start() which creates an array of tasks with class FirstLevel and passes it to the AmpCoroutinePerformer::perform. The preform method returns an array of $results returned by each FirstLevel::run() without an exception when these tasks aren't run subtask

Sometimes inside the FirstLevel::run() I use also AmpCoroutinePerformer::perform for a list of SecondLevel tasks. These second-level-tasks $results (obtained from each SecondLevel::run() by a loop) are obtained well. These results are being merged into one and returned to the first-level loop and I can see a debug message A result is obtained for task... (see code above, after $result = yield $pool->enqueue($task)). But, on return yield $pool->shutdown(); row is being performed, I'm having

Uncaught Error in child process or thread with the message "Loop stopped without resolving the promise" and code "0"; use Amp\Parallel\Sync\ContextPanicError::getOriginalTrace() for the stack trace in the child process or thread

#0 /var/www/myproject/tools/web/vendor/amphp/parallel/lib/Sync/ExitFailure.php(39): Amp\Parallel\Sync\ExitFailure->createException() #1 /var/www/myproject/tools/web/vendor/amphp/parallel/lib/Context/Process.php(315): Amp\Parallel\Sync\ExitFailure->getResult() #2 [internal function]: Amp\Parallel\Context\Process->Amp\Parallel\Context\{closure}() #3 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Coroutine.php(118): Generator->send(0) #4 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Internal/Placeholder.php(46): Amp\Coroutine->Amp\{closure}(NULL, Object(Amp\Parallel\Sync\ExitFailure)) #5 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Coroutine.php(151): Amp\Coroutine->onResolve(Object(Closure)) #6 /var/www/myproject/tools/web/vendor/amphp/amp/lib/functions.php(96): Amp\Coroutine->__construct(Object(Generator)) #7 /var/www/myproject/tools/web/vendor/amphp/parallel/lib/Context/Process.php(316): Amp\call(Object(Closure)) #8 /var/www/myproject/tools/web/vendor/amphp/parallel/lib/Context/Process.php(265): Amp\Parallel\Context\Process->join() #9 [internal function]: Amp\Parallel\Context\Process->Amp\Parallel\Context\{closure}() #10 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Coroutine.php(115): Generator->throw(Object(Amp\Parallel\Sync\ChannelException)) #11 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Failure.php(33): Amp\Coroutine->Amp\{closure}(Object(Amp\Parallel\Sync\ChannelException), NULL) #12 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Internal/Placeholder.php(143): Amp\Failure->onResolve(Object(Closure)) #13 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Internal/Placeholder.php(177): Amp\Coroutine->resolve(Object(Amp\Failure)) #14 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Coroutine.php(137): Amp\Coroutine->fail(Object(Amp\Parallel\Sync\ChannelException)) #15 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Failure.php(33): Amp\Coroutine->Amp\{closure}(Object(Amp\ByteStream\StreamException), NULL) #16 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Internal/Placeholder.php(143): Amp\Failure->onResolve(Object(Closure)) #17 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Internal/Placeholder.php(177): class@anonymous->resolve(Object(Amp\Failure)) #18 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Deferred.php(65): class@anonymous->fail(Object(Amp\ByteStream\StreamException)) #19 /var/www/myproject/tools/web/vendor/amphp/byte-stream/lib/ResourceOutputStream.php(129): Amp\Deferred->fail(Object(Amp\ByteStream\StreamException)) #20 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Loop/NativeDriver.php(216): Amp\ByteStream\ResourceOutputStream::Amp\ByteStream\{closure}('ef', Resource id #1369, NULL) #21 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Loop/NativeDriver.php(96): Amp\Loop\NativeDriver->selectStreams(Array, Array, 0.99) #22 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Loop/Driver.php(138): Amp\Loop\NativeDriver->dispatch(true) #23 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Loop/Driver.php(72): Amp\Loop\Driver->tick() #24 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Loop.php(95): Amp\Loop\Driver->run() #25 /var/www/myproject/tools/web/app/Services/Coroutine/AmpCoroutinePerformer.php(96):

As you can see, I have to use try... catch and run $pool->kill(). Then I can use my end $results and the script execution is continuing One more notice: FirstLevel tasks are not always using SecondLevel subtasks per the same test run. I would say in my real case I'm having 14 tasks of FirstLevel and only the last, 14-th runs SecondLevel subtask within

Please tell me what am I doing wrong here. Even if you're not ready to answer how to fix my issue, just point me on my mistakes or some known issues with the amphp features if you see any not-best-practice regarding promises/generators or smth else, especially inside AmpCoroutinePerformer::perform

@spiritix sorry if my issue is not relevant to your issue, just don't want to create a duplicate. If you guys think this is another issue please tell me and I will create a new one

Thank you!

7krasov avatar Mar 24 '21 21:03 7krasov

I've the same problem, here my output:

Fatal error:  Uncaught Amp\Parallel\Worker\WorkerException: The worker crashed in vendor\amphp\parallel\lib\Worker\TaskWorker.php:111
Stack trace:
#0 [internal function]: Amp\Parallel\Worker\TaskWorker->Amp\Parallel\Worker\{closure}()
#1 vendor\amphp\amp\lib\Coroutine.php(115): Generator->throw(Object(Amp\Parallel\Worker\WorkerException))
#2 vendor\amphp\amp\lib\Internal\ResolutionQueue.php(70): Amp\Coroutine->Amp\{closure}(Object(Amp\Parallel\Worker\WorkerException), NULL)
#3 vendor\amphp\amp\lib\Failure.php(33): Amp\Internal\ResolutionQueue->__invoke(Object(Amp\Parallel\Worker\WorkerException), NULL)
#4 vendor\amphp\amp\lib\Internal\Placeholder.php(143): Amp\Failure->onResolve(Object(Amp\Internal\ResolutionQueue))
#5 vendor\amphp\amp\lib\Internal\Placeholder.php(177): Amp\Coroutine->resolve( in vendor\amphp\parallel\lib\Worker\TaskWorker.php on line 111

First I thought that there might be a timeout for the callable, but when I increased just the amount of loops the error was shown after short time already, so I suspect that the queue-size respectively the amount of workers is causing the issue instead.

Beside that: I use the framework in Windows and all processor-cores are used. The results are coming damned fast and apart from this issue I'm quite satisfied with the library.

DavidBruchmann avatar Jul 25 '21 15:07 DavidBruchmann

this is an example function where the error is shown:

function testMersennePrimes($base = 2, $start = 1, $end = 1257787)
{
        $exponent = $start;
        $promises = [];
        while ($exponent <= $end) {
            if (gmp_prob_prime($exponent, 10)) {
                $promises[(string) $exponent] = Worker\enqueueCallable('gmp_prob_prime', bcsub(bcpow($base, $exponent), 1), 10);
            }
            $exponent = gmp_nextprime($exponent);
            if ($exponent > 15000) {
                sleep(0.3);
            }
        }
        $responses = Promise\wait(Promise\all($promises));
        foreach ($responses as $exponent =>  $response) {
            echo '$exponent: ' . $exponent . ': ' . ($response ? 'is prime' : '') . "\n";
        }
}

DavidBruchmann avatar Jul 25 '21 16:07 DavidBruchmann

You're currently submitting all jobs at once, so they all need to be stored in memory. You can rewrite it to use an iterator instead to save memory.

kelunik avatar Jul 25 '21 18:07 kelunik

Thanks, I considered the memory already as problem. Your hint is helping, I'll try to use the iterator like proposed.

DavidBruchmann avatar Jul 25 '21 19:07 DavidBruchmann

I am facing a similar issue and I'll try out this Iterator approach.

One of my goals is to be able to process, say, 8 items in parallel. With an Iterator, will I be able to specify how many items get processed in parallel?

jigarius avatar Sep 08 '22 13:09 jigarius

If you're on PHP 8.1, I'd recommend using AMPHP v3 with amphp/pipeline + amphp/process: https://github.com/amphp/pipeline/blob/597747610fb5ce9322db88d9e65ef5a060d593a4/examples/concurrent.php

kelunik avatar Sep 08 '22 17:09 kelunik