Rebus.RabbitMq icon indicating copy to clipboard operation
Rebus.RabbitMq copied to clipboard

Poison handling with RabbitMQ Qourum queues

Open zlepper opened this issue 1 year ago • 4 comments

Greetings :D

I created this a long time ago for the main Rebus repo: https://github.com/rebus-org/Rebus/issues/1044 where we agreed that there is no way Rebus itself can solve this.

However now that Qourum queues are well supported in RabbitMQ there is actually something we can do. From the docs here: https://www.rabbitmq.com/quorum-queues.html#poison-message-handling it is possible to check the x-delivery-count header. I have tacked together some code to do it, however i'm not sure if it's something that should be integrated in Rebus.RabbitMq directly:

await using var services = new ServiceCollection()
    .AddRebus(r =>
    {
        return r.Transport(t => t.UseRabbitMq("amqp://guest:guest@localhost:5672", "deliver-test").InputQueueOptions(
            o => { o.AddArgument("x-queue-type", "quorum"); })).Options(o =>
        {
            o.Decorate<IPipeline>(c =>
            {
                var pipeline = c.Get<IPipeline>();

                var injector = new PipelineStepInjector(pipeline);

                injector.OnReceive(new TooManyDeliveryAttemptsRejector(), PipelineRelativePosition.After,
                    typeof(DefaultRetryStep));

                return injector;
            });
            
            o.Decorate<ITransport>(c =>
            {
                var transport = c.Get<ITransport>();

                return new ClearDeliveryHeaderTransport(transport);
            });
            
            o.LogPipeline();
        });
    })
    .AddRebusHandler<MyHandler>()
    .BuildServiceProvider(true);

foreach (var hostedService in services.GetServices<IHostedService>())
{
    await hostedService.StartAsync(default);
}

var bus = services.GetRequiredService<IBus>();

// await bus.SendLocal("a message");
//
await Task.Delay(100000000);


class MyHandler : IHandleMessages<string>
{
    public Task Handle(string message)
    {
        Span<byte> lotsOfData = stackalloc byte[100_000_000];
        Console.WriteLine($"Received message: {message}, {lotsOfData.Length}");
        return Task.CompletedTask;
    }
}

[StepDocumentation("""
                   Rejects messages that have been delivered too many times.

                   If Rebus has not handled this using the normal error strategies it might be because the message is killing
                   the process preventing Rebus from handling it.
                   """)]
class TooManyDeliveryAttemptsRejector : IIncomingStep
{
    private const int MaxDeliveryAttempts = 5;

    public async Task Process(IncomingStepContext context, Func<Task> next)
    {
        var message = context.Load<TransportMessage>();

        if (message == null)
        {
            await next();
            return;
        }

        var headers = message.Headers;

        if (headers.TryGetValue(MoreRabbitMqHeader.DeliveryCount, out var deliveryCount) &&
            int.TryParse(deliveryCount, out var count))
        {
            if (count > MaxDeliveryAttempts)
            {
                throw new MaxDeliverAttemptsExceededException(
                    $"Max delivery attempts exceeded of {MaxDeliveryAttempts}. Deliver attempt count is {count}.");
            }
        }

        await next();
    }
}

class ClearDeliveryHeaderTransport : ITransport
{
    private readonly ITransport _transportImplementation;

    public ClearDeliveryHeaderTransport(ITransport transportImplementation)
    {
        _transportImplementation = transportImplementation;
    }

    public void CreateQueue(string address)
    {
        _transportImplementation.CreateQueue(address);
    }

    public Task Send(string destinationAddress, TransportMessage message, ITransactionContext context)
    {
        message.Headers.Remove(MoreRabbitMqHeader.DeliveryCount);

        return _transportImplementation.Send(destinationAddress, message, context);
    }

    public Task<TransportMessage> Receive(ITransactionContext context, CancellationToken cancellationToken)
    {
        return _transportImplementation.Receive(context, cancellationToken);
    }

    public string Address => _transportImplementation.Address;
}

public class MoreRabbitMqHeader
{
    /// <summary>
    /// How many times rabbitmq has attempted to deliver this message.
    /// This is only supported when working with Quorum queues.
    /// </summary>
    public const string DeliveryCount = "x-delivery-count";
}

public class MaxDeliverAttemptsExceededException : Exception, IFailFastException
{
    public MaxDeliverAttemptsExceededException(string? message) : base(message)
    {
    }
}

This has to be tested in a separate process and not a unit test to really test it, as it has to take down the entire process for Rabbit to do the incrementing.

I couldn't find a way to mutate headers when handling error messages except for wrapping the transport itself and having it clear the header. Outgoing pipeline decorations seems to be ignored when processing error messages.

It should probably also read the max attempts from the retry strategy, but i tried to keep it as simple as possible.

zlepper avatar Dec 15 '23 08:12 zlepper