log icon indicating copy to clipboard operation
log copied to clipboard

Amphp, Symfony and Monolog

Open Perf opened this issue 1 year ago • 1 comments

I am trying to make use of Amphp's StreamHanlder for async logging of messages.

Setup

  • Debian Bookworm
  • PHP 8.4
  • Symfony 7.2

Configs and sources composer.json

...
    "require": {
        ...
        "ext-eio": "*",
        "ext-ev": "*",
        "amphp/amp": "^3.1",
        "amphp/file": "^3.2",
        "amphp/log": "^2.0",
...

monolog.yaml

...
    monolog:
        handlers:
            main:
                handler: async
                type: buffer
                buffer_size: 1
                level: debug
                bubble: false
                flush_on_overflow: true
                channels: ["!event", "!deprecation"]
            async:
                type: stream
                path: "%kernel.logs_dir%/%kernel.environment%.log"
                bubble: false
...

AsyncStreamHandlerDecorator.php

<?php

declare(strict_types=1);

namespace App\Infrastructure\Logger\Handler;

use Amp\ByteStream\WritableResourceStream;
use Amp\Log\StreamHandler as AmphpStreamHandler;
use Monolog\Formatter\FormatterInterface;
use Monolog\Handler\FormattableHandlerInterface;
use Monolog\Handler\HandlerInterface;
use Monolog\Handler\ProcessableHandlerInterface;
use Monolog\Handler\StreamHandler;
use Monolog\LogRecord;
use Monolog\ResettableInterface;
use Symfony\Component\DependencyInjection\Attribute\AsDecorator;
use function Amp\ByteStream\getStderr;
use function Amp\File\openFile;

#[AsDecorator(decorates: 'monolog.handler.async')]
readonly class AsyncStreamHandlerDecorator implements HandlerInterface, ResettableInterface, ProcessableHandlerInterface, FormattableHandlerInterface
{
    private AmphpStreamHandler $asyncHandler;

    public function __construct(private StreamHandler $inner)
    {
        if ($stream = $this->inner->getStream()) {
            $sink = new WritableResourceStream($stream, $this->inner->getStreamChunkSize());
        } elseif ($url = $this->inner->getUrl()) {
            $sink = openFile($url, 'a');
        } else {
            $sink = getStderr();
        }

        $this->asyncHandler = new AmphpStreamHandler(
            $sink,
            $this->inner->getLevel(),
            $this->inner->getBubble(),
        );

        if ($formatter = $this->inner->getFormatter()) {
            $this->asyncHandler->setFormatter($formatter);
        }
    }

    public function isHandling(LogRecord $record): bool
    {
        return $this->asyncHandler->isHandling($record);
    }

    public function handle(LogRecord $record): bool
    {
        return $this->asyncHandler->handle($record);
    }

    public function handleBatch(array $records): void
    {
        $this->asyncHandler->handleBatch($records);
    }

    public function close(): void
    {
        $this->asyncHandler->close();
    }

    public function reset(): void
    {
        $this->asyncHandler->reset();
    }

    public function setFormatter(FormatterInterface $formatter): HandlerInterface
    {
        return $this->asyncHandler->setFormatter($formatter);
    }

    public function getFormatter(): FormatterInterface
    {
        return $this->asyncHandler->getFormatter();
    }

    public function pushProcessor(callable $callback): HandlerInterface
    {
        $this->asyncHandler->pushProcessor($callback);
    }

    public function popProcessor(): callable
    {
        return $this->asyncHandler->popProcessor();
    }
}

Result Logging works, I see messages are written to dev.log file. Amp\Log\StreamHandler->write() is called, so it seems correctly plugged into the Symfony and Monolog.

However, getting an error, when console command finishes:

In InvalidCallbackError.php line 33:

[Revolt\EventLoop\InvalidCallbackError (2)]
Invalid callback identifier a


Exception trace:
at /app/vendor/revolt/event-loop/src/EventLoop/InvalidCallbackError.php:33
Revolt\EventLoop\InvalidCallbackError::invalidIdentifier() at /app/vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php:213
Revolt\EventLoop\Internal\AbstractDriver->enable() at /app/vendor/amphp/file/src/Internal/EioPoll.php:48
Amp\File\Internal\EioPoll->listen() at /app/vendor/amphp/file/src/Driver/EioFile.php:136
Amp\File\Driver\EioFile->push() at /app/vendor/amphp/file/src/Internal/QueuedWritesFile.php:71
Amp\File\Internal\QueuedWritesFile->write() at /app/vendor/amphp/file/src/Driver/StatusCachingFile.php:41
Amp\File\Driver\StatusCachingFile->write() at /app/vendor/amphp/log/src/StreamHandler.php:43
Amp\Log\StreamHandler->write() at /app/vendor/monolog/monolog/src/Monolog/Handler/AbstractProcessingHandler.php:44
Monolog\Handler\AbstractProcessingHandler->handle() at /app/vendor/monolog/monolog/src/Monolog/Handler/Handler.php:27
Monolog\Handler\Handler->handleBatch() at /app/src/Infrastructure/Logger/Handler/AsyncStreamHandlerDecorator.php:58
App\Infrastructure\Logger\Handler\AsyncStreamHandlerDecorator->handleBatch() at /app/vendor/monolog/monolog/src/Monolog/Handler/BufferHandler.php:97
Monolog\Handler\BufferHandler->flush() at /app/vendor/monolog/monolog/src/Monolog/Handler/BufferHandler.php:74
Monolog\Handler\BufferHandler->handle() at /app/vendor/monolog/monolog/src/Monolog/Logger.php:391
Monolog\Logger->addRecord() at /app/vendor/monolog/monolog/src/Monolog/Logger.php:594
Monolog\Logger->debug() at /app/vendor/doctrine/dbal/src/Logging/Connection.php:48
Doctrine\DBAL\Logging\Connection->exec() at /app/vendor/doctrine/dbal/src/Driver/Middleware/AbstractConnectionMiddleware.php:46
Doctrine\DBAL\Driver\Middleware\AbstractConnectionMiddleware->exec() at /app/vendor/symfony/doctrine-bridge/Middleware/Debug/DBAL3/Connection.php:73
Symfony\Bridge\Doctrine\Middleware\Debug\DBAL3\Connection->exec() at /app/vendor/doctrine/dbal/src/Connection.php:1216
Doctrine\DBAL\Connection->executeStatement() at /app/vendor/symfony/doctrine-messenger/Transport/Connection.php:425
Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection->executeStatement() at /app/vendor/symfony/doctrine-messenger/Transport/PostgreSqlConnection.php:142
Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection->unlisten() at /app/vendor/symfony/doctrine-messenger/Transport/PostgreSqlConnection.php:48
Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection->__destruct() at n/a:n/a

Perf avatar Mar 10 '25 21:03 Perf

And, when stopping symfony messenger gettins such an error:

In DriverSuspension.php line 83:

Must call resume() or throw() before calling suspend() again


messenger:consume [-l|--limit LIMIT] [-f|--failure-limit FAILURE-LIMIT] [-m|--memory-limit MEMORY-LIMIT] [-t|--time-limit TIME-LIMIT] [--sleep SLEEP] [-b|--bus BUS] [--queues QUEUES] [--no-reset] [--all] [--keepalive [KEEPALIVE]] [--] [<receivers>...]

Error {#487
#message: "Must call resume() or throw() before calling suspend() again"
#code: 0
#file: "./vendor/revolt/event-loop/src/EventLoop/Internal/DriverSuspension.php"
#line: 83
trace: {
./vendor/revolt/event-loop/src/EventLoop/Internal/DriverSuspension.php:83 { …}
./vendor/amphp/amp/src/Future.php:251 { …}
./vendor/amphp/file/src/Internal/QueuedWritesFile.php:80 { …}
./vendor/amphp/file/src/Driver/StatusCachingFile.php:41 { …}
./vendor/amphp/log/src/StreamHandler.php:43 { …}
./vendor/monolog/monolog/src/Monolog/Handler/AbstractProcessingHandler.php:44 { …}
./vendor/monolog/monolog/src/Monolog/Handler/Handler.php:27 { …}
./src/Infrastructure/Logger/Handler/AsyncStreamHandlerDecorator.php:58 {
App\Infrastructure\Logger\Handler\AsyncStreamHandlerDecorator->handleBatch(array $records): void^
› {
›     $this->asyncHandler->handleBatch($records);
› }
arguments: {
$records: array:1 [ …1]
}
}
./vendor/monolog/monolog/src/Monolog/Handler/BufferHandler.php:97 { …}
./vendor/monolog/monolog/src/Monolog/Handler/BufferHandler.php:113 { …}
Monolog\Handler\BufferHandler->close() {}
}
}
PHP Fatal error:  Allowed memory size of 536870912 bytes exhausted (tried to allocate 36864 bytes) in /app/vendor/revolt/event-loop/src/EventLoop/Internal/DriverSuspension.php on line 83

Perf avatar Mar 10 '25 22:03 Perf