NServiceBus.RabbitMQ icon indicating copy to clipboard operation
NServiceBus.RabbitMQ copied to clipboard

Automatic rate limiting feature might cause duplicate messages

Open timbussmann opened this issue 1 year ago • 1 comments

Describe the bug

Description

When using the automatic rate limiting feature with the RabbitMQ transport, the functionality might produce duplicate messages in some edge cases.

Expected behavior

When automatic rate limiting kicks in, message processing should be reduced to a single message at the time with the configured delay interval between further processing attempts until message processing succeeds again.

Actual behavior

In the specific cases where the configured consecutive failure counter is reached and the rate-limiting starts to kick in, a message that should be handed over to delayed retries or to the error queue can be duplicated. There is no impact on messages that are handled via immediate retries. This also only happens when the failure counter reaches the limit, further failures while the rate limiting is enabled do not create message duplicates.

This is caused by the implementation which tries to reduce the transports concurrency setting once the configured failure counter is reached. The concurrency change is achieved by reconnecting to the broker with adjusted concurrency settings. However, this is done while the (failed) message is still being processed. After the connection is reset, the error handling pipeline tries to send a copy of the message for delayed delivery or to the error queue, while the failed message is expected to be acknowledged afterward. While sending messages works with the new connection, the currently processed message can't be acknowledged via the new connection. Therefore the message will be available for processing again even though a copy has been sent for delayed retries or to the error queue.

Versions

  • 8.0
  • not yet verified on version 7

Steps to reproduce

With the following configuration:

        endpointConfiguration.Recoverability()
            .Delayed(d => d.NumberOfRetries(1))
            .Immediate(i => i.NumberOfRetries(1))
            .OnConsecutiveFailures(2,
                new RateLimitSettings(TimeSpan.FromSeconds(10),
                    token =>
                    {
                        Console.WriteLine("rate limit started");
                        return Task.CompletedTask;
                    }, token =>
                    {
                        Console.WriteLine("rate limit ended");
                        return Task.CompletedTask;
                    }));

send a message to a handler throwing an exception. See the following log example.

Relevant log output

2023-05-01 16:07:51.737 INFO  Processing message
message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
Delayed retry counter: 0
2023-05-01 16:07:51.932 INFO  Immediate Retry is going to retry message '96dff5e4-f63b-4d60-8a48-aff500e8df38' because of an exception:
System.Exception: pewpew
   at MyCommandHandler.Handle(MyCommand commandMessage, IMessageHandlerContext context) in \Downloads\simple_rabbit_8_net7_0 (1)\Receiver\MyCommandHandler.cs:line 22
   at (ArrayClosure, Object, Object, IMessageHandlerContext)
   at NServiceBus.Pipeline.MessageHandler.Invoke(Object message, IMessageHandlerContext handlerContext)
   at NServiceBus.InvokeHandlerTerminator.Terminate(IInvokeHandlerContext context)
   at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage)
   at NServiceBus.MutateIncomingMessageBehavior.InvokeIncomingMessageMutators(IIncomingLogicalMessageContext context, Func`2 next)
   at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage)
   at NServiceBus.UnitOfWorkBehavior.InvokeUnitsOfWork(IIncomingPhysicalMessageContext context, Func`2 next)
   at NServiceBus.UnitOfWorkBehavior.InvokeUnitsOfWork(IIncomingPhysicalMessageContext context, Func`2 next)
   at NServiceBus.MutateIncomingTransportMessageBehavior.InvokeIncomingTransportMessagesMutators(IIncomingPhysicalMessageContext context, Func`2 next)
   at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next)
   at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next)
   at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next)
   at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
   at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
   at NServiceBus.AdaptiveMessageReceiver.WrappedInvoke(MessageContext messageContext, CancellationToken cancellationToken)
   at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken)
Exception details:
	Message type: MyCommand
	Handler type: MyCommandHandler
	Handler start time: 2023-05-01 14:07:51:736450 Z
	Handler failure time: 2023-05-01 14:07:51:767432 Z
	Handler canceled: False
	Message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
	Pipeline canceled: False
2023-05-01 16:07:51.953 INFO  Processing message
message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
Delayed retry counter: 0
2023-05-01 16:07:52.060 WARN  The consecutive failures circuit breaker is now in the armed state
2023-05-01 16:07:52.062 INFO  Pause message processing for 0:00:10 due to enabled consecutive failures mode.
2023-05-01 16:07:52.063 INFO  Calling a change concurrency and reconnecting with new value 1.
2023-05-01 16:07:52.076 INFO  'Samples.RabbitMQ.SimpleReceiver MessagePump': Attempting to reconnect in 10 seconds.
2023-05-01 16:08:02.080 WARN  Delayed Retry will reschedule message '96dff5e4-f63b-4d60-8a48-aff500e8df38' after a delay of 00:00:10 because of an exception:
System.Exception: pewpew
   at MyCommandHandler.Handle(MyCommand commandMessage, IMessageHandlerContext context) in \Downloads\simple_rabbit_8_net7_0 (1)\Receiver\MyCommandHandler.cs:line 22
   at (ArrayClosure, Object, Object, IMessageHandlerContext)
   at NServiceBus.Pipeline.MessageHandler.Invoke(Object message, IMessageHandlerContext handlerContext)
   at NServiceBus.InvokeHandlerTerminator.Terminate(IInvokeHandlerContext context)
   at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage)
   at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage)
   at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next)
   at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next)
   at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next)
   at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
   at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
   at NServiceBus.AdaptiveMessageReceiver.WrappedInvoke(MessageContext messageContext, CancellationToken cancellationToken)
   at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken)
Exception details:
	Message type: MyCommand
	Handler type: MyCommandHandler
	Handler start time: 2023-05-01 14:07:51:953324 Z
	Handler failure time: 2023-05-01 14:07:51:967382 Z
	Handler canceled: False
	Message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
	Pipeline canceled: False
2023-05-01 16:08:02.103 INFO  'Samples.RabbitMQ.SimpleReceiver MessagePump': Connection to the broker reestablished successfully.
2023-05-01 16:08:02.106 INFO  Processing message
message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
Delayed retry counter: 0
2023-05-01 16:08:02.214 INFO  Pause message processing for 0:00:10 due to enabled consecutive failures mode.
2023-05-01 16:08:02.217 WARN  Failed to acknowledge message '96dff5e4-f63b-4d60-8a48-aff500e8df38' because the channel was closed. The message was returned to the queue.
RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200, text='Goodbye', classId=0, methodId=0
   at RabbitMQ.Client.Impl.SessionBase.Transmit(OutgoingCommand& cmd)
   at RabbitMQ.Client.Impl.ModelBase.ModelSend(MethodBase method, ContentHeaderBase header, ReadOnlyMemory`1 body)
   at RabbitMQ.Client.Framing.Impl.Model.BasicAck(UInt64 deliveryTag, Boolean multiple)
   at NServiceBus.Transport.RabbitMQ.ModelExtensions.BasicAckSingle(IModel channel, UInt64 deliveryTag) in /_/src/NServiceBus.Transport.RabbitMQ/Receiving/ModelExtensions.cs:line 9
   at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken) in /_/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs:line 431
2023-05-01 16:08:12.227 WARN  Delayed Retry will reschedule message '96dff5e4-f63b-4d60-8a48-aff500e8df38' after a delay of 00:00:10 because of an exception:
System.Exception: pewpew
   at MyCommandHandler.Handle(MyCommand commandMessage, IMessageHandlerContext context) in \Downloads\simple_rabbit_8_net7_0 (1)\Receiver\MyCommandHandler.cs:line 22
   at (ArrayClosure, Object, Object, IMessageHandlerContext)
   at NServiceBus.Pipeline.MessageHandler.Invoke(Object message, IMessageHandlerContext handlerContext)
   at NServiceBus.InvokeHandlerTerminator.Terminate(IInvokeHandlerContext context)
   at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage)
   at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage)
   at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next)
   at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next)
   at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next)
   at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
   at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
   at NServiceBus.AdaptiveMessageReceiver.WrappedInvoke(MessageContext messageContext, CancellationToken cancellationToken)
   at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken)
Exception details:
	Message type: MyCommand
	Handler type: MyCommandHandler
	Handler start time: 2023-05-01 14:08:02:106038 Z
	Handler failure time: 2023-05-01 14:08:02:120234 Z
	Handler canceled: False
	Message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
	Pipeline canceled: False
2023-05-01 16:08:12.237 INFO  Processing message
message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
Delayed retry counter: 1
2023-05-01 16:08:12.416 INFO  Pause message processing for 0:00:10 due to enabled consecutive failures mode.
2023-05-01 16:08:22.423 INFO  Immediate Retry is going to retry message '96dff5e4-f63b-4d60-8a48-aff500e8df38' because of an exception:
System.Exception: pewpew
   at MyCommandHandler.Handle(MyCommand commandMessage, IMessageHandlerContext context) in \Downloads\simple_rabbit_8_net7_0 (1)\Receiver\MyCommandHandler.cs:line 22
   at (ArrayClosure, Object, Object, IMessageHandlerContext)
   at NServiceBus.Pipeline.MessageHandler.Invoke(Object message, IMessageHandlerContext handlerContext)
   at NServiceBus.InvokeHandlerTerminator.Terminate(IInvokeHandlerContext context)
   at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage)
   at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage)
   at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next)
   at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next)
   at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next)
   at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
   at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
   at NServiceBus.AdaptiveMessageReceiver.WrappedInvoke(MessageContext messageContext, CancellationToken cancellationToken)
   at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken)
Exception details:
	Message type: MyCommand
	Handler type: MyCommandHandler
	Handler start time: 2023-05-01 14:08:12:237766 Z
	Handler failure time: 2023-05-01 14:08:12:264706 Z
	Handler canceled: False
	Message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
	Pipeline canceled: False
2023-05-01 16:08:22.426 INFO  Processing message
message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
Delayed retry counter: 1
2023-05-01 16:08:22.593 INFO  Pause message processing for 0:00:10 due to enabled consecutive failures mode.
2023-05-01 16:08:32.608 INFO  Immediate Retry is going to retry message '96dff5e4-f63b-4d60-8a48-aff500e8df38' because of an exception:
System.Exception: pewpew
   at MyCommandHandler.Handle(MyCommand commandMessage, IMessageHandlerContext context) in \Downloads\simple_rabbit_8_net7_0 (1)\Receiver\MyCommandHandler.cs:line 22
   at (ArrayClosure, Object, Object, IMessageHandlerContext)
   at NServiceBus.Pipeline.MessageHandler.Invoke(Object message, IMessageHandlerContext handlerContext)
   at NServiceBus.InvokeHandlerTerminator.Terminate(IInvokeHandlerContext context)
   at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage)
   at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage)
   at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next)
   at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next)
   at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next)
   at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
   at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
   at NServiceBus.AdaptiveMessageReceiver.WrappedInvoke(MessageContext messageContext, CancellationToken cancellationToken)
   at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken)
Exception details:
	Message type: MyCommand
	Handler type: MyCommandHandler
	Handler start time: 2023-05-01 14:08:22:426253 Z
	Handler failure time: 2023-05-01 14:08:22:446579 Z
	Handler canceled: False
	Message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
	Pipeline canceled: False
2023-05-01 16:08:32.610 INFO  Processing message
message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
Delayed retry counter: 1
2023-05-01 16:08:32.769 INFO  Pause message processing for 0:00:10 due to enabled consecutive failures mode.
2023-05-01 16:08:42.777 ERROR Moving message '96dff5e4-f63b-4d60-8a48-aff500e8df38' to the error queue 'error' because processing failed due to an exception:
System.Exception: pewpew
   at MyCommandHandler.Handle(MyCommand commandMessage, IMessageHandlerContext context) in \Downloads\simple_rabbit_8_net7_0 (1)\Receiver\MyCommandHandler.cs:line 22
   at (ArrayClosure, Object, Object, IMessageHandlerContext)
   at NServiceBus.Pipeline.MessageHandler.Invoke(Object message, IMessageHandlerContext handlerContext)
   at NServiceBus.InvokeHandlerTerminator.Terminate(IInvokeHandlerContext context)
   at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage)
   at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage)
   at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next)
   at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next)
   at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next)
   at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
   at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
   at NServiceBus.AdaptiveMessageReceiver.WrappedInvoke(MessageContext messageContext, CancellationToken cancellationToken)
   at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken)
Exception details:
	Message type: MyCommand
	Handler type: MyCommandHandler
	Handler start time: 2023-05-01 14:08:32:610920 Z
	Handler failure time: 2023-05-01 14:08:32:629076 Z
	Handler canceled: False
	Message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
	Pipeline canceled: False
2023-05-01 16:08:42.794 INFO  Processing message
message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
Delayed retry counter: 1
2023-05-01 16:08:42.959 INFO  Pause message processing for 0:00:10 due to enabled consecutive failures mode.
2023-05-01 16:08:52.974 ERROR Moving message '96dff5e4-f63b-4d60-8a48-aff500e8df38' to the error queue 'error' because processing failed due to an exception:
System.Exception: pewpew
   at MyCommandHandler.Handle(MyCommand commandMessage, IMessageHandlerContext context) in \Downloads\simple_rabbit_8_net7_0 (1)\Receiver\MyCommandHandler.cs:line 22
   at (ArrayClosure, Object, Object, IMessageHandlerContext)
   at NServiceBus.Pipeline.MessageHandler.Invoke(Object message, IMessageHandlerContext handlerContext)
   at NServiceBus.InvokeHandlerTerminator.Terminate(IInvokeHandlerContext context)
   at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage)
   at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage)
   at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next)
   at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next)
   at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next)
   at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
   at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
   at NServiceBus.AdaptiveMessageReceiver.WrappedInvoke(MessageContext messageContext, CancellationToken cancellationToken)
   at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken)
Exception details:
	Message type: MyCommand
	Handler type: MyCommandHandler
	Handler start time: 2023-05-01 14:08:42:794772 Z
	Handler failure time: 2023-05-01 14:08:42:813593 Z
	Handler canceled: False
	Message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
	Pipeline canceled: False

Additional Information

Workarounds

Possible solutions

  • Ensure that reconnecting only happens when the remaining inflight messages have been processed.

Additional information

timbussmann avatar May 02 '23 15:05 timbussmann