parallel
parallel copied to clipboard
The worker crashed / Loop exceptionally stopped without resolving the promise
I'm continuously getting the following errors:
The worker crashed - amphp/parallel/lib/Worker/TaskWorker.php:111 Loop exceptionally stopped without resolving the promise - amphp/amp/lib/functions.php:231 Worker in pool exited unexpectedly with code -1 - amphp/parallel/lib/Worker/DefaultPool.php:254
Unfortunately I can't reproduce these issues, sometimes it works, sometimes it doesn't. Seems to be related with current RAM and CPU usage.
How can I further debug these issues? I don't see any further error messages from the workers themselves.
Do you have PHP error logs enabled? Do these reveal anything?
Yeah, I'm actually even using Papertrail to collect all logs. Nothing relevant in there.
Hi @kelunik , I'm facing the same issue too, on $pool->shutdown()
I don't have neither parallel
nor pthreads
extensions so WorkerProcess
is being used
class AmpCoroutinePerformer
{
...
public function perform(array $tasks, int $maxCoroutinesCount): array
{
$pool = new DefaultPool($maxCoroutinesCount, $this->workerFactory);
$results = [];
Loop::run(
function () use ($pool, $tasks, &$results) {
// Creates a context using Process, or if ext-parallel is installed, Parallel.
$coroutines = [];
foreach ($tasks as $taskKey => $task) {
$coroutines[$taskKey] = call(
function () use ($pool, $task, $taskKey) {
$result = yield $pool->enqueue($task);
$this->logDebug(sprintf('A result is obtained for task %s', $taskKey));
return $result;
}
);
}
$results = yield all($coroutines);
try {
return yield $pool->shutdown();
} catch (Throwable $t) {
$data['trace_string'] = $t instanceof TaskFailureThrowable
? $t->getOriginalTraceAsString()
: $t->getTraceAsString();
$this->logDebug(
sprintf(
'An "%s" throwable has been caught on workers shutdown. Message: %s',
get_class($t),
$t->getMessage()
),
$data
);
return new Promise(
function () use ($pool) {
$this->logDebug('Performing kill ()...');
$pool->kill();
}
);
}
}
);
return $results;
}
}
class App {
public function start()
{
...
/** @var FirstLevel[] $tasks */
$results = $tasksPerformer->perform($tasks, $maxTasksCount);
//no error on the loop shutdown without second-level loop usage
//error exists on the loop shutdown when using second-level loop for subtask
...
}
}
class FirstLevel implements \Amp\Parallel\Worker\Task
{
public function run(Environment $environment)
{
...
try {
//return $endResult; //returns results, without subtasks, no error on shutdown
/** @var SecondLevel[] $tasks */
$results = $tasksPerformer->perform($tasks, $maxTasksCount);
$endResult = $this->mergeResults($results); //here we use second-level loop (run subtasks)
return $endResult; //returns results, with subtasks, error on shutdown in the first-level loop
} catch (\Throwable $t) {
Helper::flattenThrowableBacktrace($t);
return $t;
}
...
}
}
class SecondLevel implements \Amp\Parallel\Worker\Task
{
public function run(Environment $environment)
{
...
try {
return $endResult //returns results, without subtasks, no error on shutdown
} catch (\Throwable $t) {
Helper::flattenThrowableBacktrace($t);
return $t;
}
...
}
}
Let's say an application has some method App::start()
which creates an array of tasks with class FirstLevel
and passes it to the AmpCoroutinePerformer::perform
. The preform
method returns an array of $results
returned by each FirstLevel::run()
without an exception when these tasks aren't run subtask
Sometimes inside the FirstLevel::run()
I use also AmpCoroutinePerformer::perform
for a list of SecondLevel
tasks. These second-level-tasks $results
(obtained from each SecondLevel::run()
by a loop) are obtained well. These results are being merged into one and returned to the first-level loop and I can see a debug message A result is obtained for task...
(see code above, after $result = yield $pool->enqueue($task)
). But, on return yield $pool->shutdown();
row is being performed, I'm having
Uncaught Error in child process or thread with the message "Loop stopped without resolving the promise" and code "0"; use Amp\Parallel\Sync\ContextPanicError::getOriginalTrace() for the stack trace in the child process or thread
#0 /var/www/myproject/tools/web/vendor/amphp/parallel/lib/Sync/ExitFailure.php(39): Amp\Parallel\Sync\ExitFailure->createException() #1 /var/www/myproject/tools/web/vendor/amphp/parallel/lib/Context/Process.php(315): Amp\Parallel\Sync\ExitFailure->getResult() #2 [internal function]: Amp\Parallel\Context\Process->Amp\Parallel\Context\{closure}() #3 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Coroutine.php(118): Generator->send(0) #4 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Internal/Placeholder.php(46): Amp\Coroutine->Amp\{closure}(NULL, Object(Amp\Parallel\Sync\ExitFailure)) #5 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Coroutine.php(151): Amp\Coroutine->onResolve(Object(Closure)) #6 /var/www/myproject/tools/web/vendor/amphp/amp/lib/functions.php(96): Amp\Coroutine->__construct(Object(Generator)) #7 /var/www/myproject/tools/web/vendor/amphp/parallel/lib/Context/Process.php(316): Amp\call(Object(Closure)) #8 /var/www/myproject/tools/web/vendor/amphp/parallel/lib/Context/Process.php(265): Amp\Parallel\Context\Process->join() #9 [internal function]: Amp\Parallel\Context\Process->Amp\Parallel\Context\{closure}() #10 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Coroutine.php(115): Generator->throw(Object(Amp\Parallel\Sync\ChannelException)) #11 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Failure.php(33): Amp\Coroutine->Amp\{closure}(Object(Amp\Parallel\Sync\ChannelException), NULL) #12 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Internal/Placeholder.php(143): Amp\Failure->onResolve(Object(Closure)) #13 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Internal/Placeholder.php(177): Amp\Coroutine->resolve(Object(Amp\Failure)) #14 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Coroutine.php(137): Amp\Coroutine->fail(Object(Amp\Parallel\Sync\ChannelException)) #15 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Failure.php(33): Amp\Coroutine->Amp\{closure}(Object(Amp\ByteStream\StreamException), NULL) #16 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Internal/Placeholder.php(143): Amp\Failure->onResolve(Object(Closure)) #17 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Internal/Placeholder.php(177): class@anonymous->resolve(Object(Amp\Failure)) #18 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Deferred.php(65): class@anonymous->fail(Object(Amp\ByteStream\StreamException)) #19 /var/www/myproject/tools/web/vendor/amphp/byte-stream/lib/ResourceOutputStream.php(129): Amp\Deferred->fail(Object(Amp\ByteStream\StreamException)) #20 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Loop/NativeDriver.php(216): Amp\ByteStream\ResourceOutputStream::Amp\ByteStream\{closure}('ef', Resource id #1369, NULL) #21 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Loop/NativeDriver.php(96): Amp\Loop\NativeDriver->selectStreams(Array, Array, 0.99) #22 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Loop/Driver.php(138): Amp\Loop\NativeDriver->dispatch(true) #23 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Loop/Driver.php(72): Amp\Loop\Driver->tick() #24 /var/www/myproject/tools/web/vendor/amphp/amp/lib/Loop.php(95): Amp\Loop\Driver->run() #25 /var/www/myproject/tools/web/app/Services/Coroutine/AmpCoroutinePerformer.php(96):
As you can see, I have to use try... catch
and run $pool->kill()
. Then I can use my end $results and the script execution is continuing
One more notice: FirstLevel tasks are not always using SecondLevel subtasks per the same test run. I would say in my real case I'm having 14 tasks of FirstLevel and only the last, 14-th runs SecondLevel subtask within
Please tell me what am I doing wrong here.
Even if you're not ready to answer how to fix my issue, just point me on my mistakes or some known issues with the amphp features if you see any not-best-practice regarding promises/generators or smth else, especially inside AmpCoroutinePerformer::perform
@spiritix sorry if my issue is not relevant to your issue, just don't want to create a duplicate. If you guys think this is another issue please tell me and I will create a new one
Thank you!
I've the same problem, here my output:
Fatal error: Uncaught Amp\Parallel\Worker\WorkerException: The worker crashed in vendor\amphp\parallel\lib\Worker\TaskWorker.php:111
Stack trace:
#0 [internal function]: Amp\Parallel\Worker\TaskWorker->Amp\Parallel\Worker\{closure}()
#1 vendor\amphp\amp\lib\Coroutine.php(115): Generator->throw(Object(Amp\Parallel\Worker\WorkerException))
#2 vendor\amphp\amp\lib\Internal\ResolutionQueue.php(70): Amp\Coroutine->Amp\{closure}(Object(Amp\Parallel\Worker\WorkerException), NULL)
#3 vendor\amphp\amp\lib\Failure.php(33): Amp\Internal\ResolutionQueue->__invoke(Object(Amp\Parallel\Worker\WorkerException), NULL)
#4 vendor\amphp\amp\lib\Internal\Placeholder.php(143): Amp\Failure->onResolve(Object(Amp\Internal\ResolutionQueue))
#5 vendor\amphp\amp\lib\Internal\Placeholder.php(177): Amp\Coroutine->resolve( in vendor\amphp\parallel\lib\Worker\TaskWorker.php on line 111
First I thought that there might be a timeout for the callable, but when I increased just the amount of loops the error was shown after short time already, so I suspect that the queue-size respectively the amount of workers is causing the issue instead.
Beside that: I use the framework in Windows and all processor-cores are used. The results are coming damned fast and apart from this issue I'm quite satisfied with the library.
this is an example function where the error is shown:
function testMersennePrimes($base = 2, $start = 1, $end = 1257787)
{
$exponent = $start;
$promises = [];
while ($exponent <= $end) {
if (gmp_prob_prime($exponent, 10)) {
$promises[(string) $exponent] = Worker\enqueueCallable('gmp_prob_prime', bcsub(bcpow($base, $exponent), 1), 10);
}
$exponent = gmp_nextprime($exponent);
if ($exponent > 15000) {
sleep(0.3);
}
}
$responses = Promise\wait(Promise\all($promises));
foreach ($responses as $exponent => $response) {
echo '$exponent: ' . $exponent . ': ' . ($response ? 'is prime' : '') . "\n";
}
}
You're currently submitting all jobs at once, so they all need to be stored in memory. You can rewrite it to use an iterator instead to save memory.
Thanks, I considered the memory already as problem. Your hint is helping, I'll try to use the iterator like proposed.
I am facing a similar issue and I'll try out this Iterator approach.
One of my goals is to be able to process, say, 8 items in parallel. With an Iterator, will I be able to specify how many items get processed in parallel?
If you're on PHP 8.1, I'd recommend using AMPHP v3 with amphp/pipeline
+ amphp/process
: https://github.com/amphp/pipeline/blob/597747610fb5ce9322db88d9e65ef5a060d593a4/examples/concurrent.php