psr-container-messenger icon indicating copy to clipboard operation
psr-container-messenger copied to clipboard

bug: When dispatch to Rabbimq failing, Messages get lost

Open notdefine opened this issue 1 year ago • 1 comments

We found a constelation, when using RabbitMQ with the transport option

    'messenger' => [
        'transports' => [
             'options' => ['confirm_timeout' => 1]
        ]
]

no retry is made. So we add a middleware to retry this dispatch. This middleware must be added to the command bus configuration.

<?php

declare(strict_types=1);

namespace Mehrkanal\Messenger\Middleware;

use AMQPQueueException;
use Psr\Log\LoggerInterface;
use RuntimeException;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Throwable;

final class ConfirmTimeoutMiddleware implements MiddlewareInterface
{
    public const MAX_PUBLISH_TRIES = 5;

    public function __construct(private LoggerInterface $logger)
    {
    }

    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        for ($try = 0; $try < self::MAX_PUBLISH_TRIES; $try++) {
            try {
                return $stack->next()
                    ->handle($envelope, $stack);
            } catch (AMQPQueueException|TransportException $exception) {
                $this->logger->warning(
                    'RabbitMQ publishing retry.',
                    [
                        'class' => self::class,
                        'exception_class' => $exception::class,
                        'exception_message' => $exception->getMessage(),
                        'exception_trace' => $exception->getTrace(),
                        'envelop' => $envelope,
                        'try' => $try,
                    ]
                );
            } catch (Throwable $throwable) {
                $this->logger->critical('RabbitMQ publishing exception.', [
                    'class' => self::class,
                    'envelop' => $envelope,
                    'exception_class' => $throwable::class,
                    'exception_message' => $throwable->getMessage(),
                    'exception_trace' => $throwable->getTrace(),
                ]);
                throw new RuntimeException('Publishing failed', previous: $throwable);
            }
        }
        $this->logger->critical('RabbitMQ publishing failed.', [
            'class' => self::class,
            'envelop' => $envelope,
        ]);
        throw new RuntimeException('Publishing failed');
    }
}

Since this project has no recent activity, and there are no responses to Merge Requests I place my knowlege her. Because we had some trouble with missing Messages.

greetings Thomas

notdefine avatar Feb 02 '24 10:02 notdefine

Thank you for your contribution @notdefine. We have done something similar in the past, but the solution was never submitted here.

geerteltink avatar Jun 19 '24 11:06 geerteltink