InboxPattern delivers the same Etos twice when the RabbitMQ connection is interrupted
Is there an existing issue for this?
- [X] I have searched the existing issues
Description
I have created a sample application to demonstrate the issue. See AbpIssueInboxPatternNotPreventingDuplication
The application has
- InboxPattern
- OutboxPattern
- a DistributedLockProvider for SQL set up. It sends an exponentially increasing number of PingEto and PongEtos to generate some artifical traffic on the distributed event bus. If you keep it running it will exit itself gracefully after receiving 2048 PongEto messages.
Reproduction of the issue - see below.
It can happen that the restored RabbitMQ connection will cause AMQP message to get redelivered that already been received before the connection was lost. The use of the inbox pattern should prevent those events from being processed twice by an EventHandler - but somehow it does not.
Is this a bug or am I missing anything in how the InboxPattern should be setup correctly?
Reproduction Steps
- checkout AbpIssueInboxPatternNotPreventingDuplication app
- have a RabbitMQ installation running at localhost using the default credentials (guest, guest) - or adapt the appsettings.json accordingly
- run "dotnet run --migrate-database" from within the "AbpIssue" project folder inside the solution
- place breakpoints in class AbpIssue.EventHandler.AbstractEventHandler in method SequenceNumberDuplicationCheck()
- in line 28
- in line 36
- run the app with launch settings "AbpIssue" from VisualStudio
- "force close" the RabbitMQ connection from the management webinterface of RabbitMq right while the console says the app is processing events from the inbox.
[17:55:28 INF] Processed the incoming event with id = 4ebfb8d044761169c4ff3a15601e61f7
[17:55:28 INF] Processed the incoming event with id = f550a456ee8ee9c0f35b3a15601e6200
[17:55:28 INF] Processed the incoming event with id = 915e68fc3002dad04de13a15601e620a
[17:55:28 INF] Processed the incoming event with id = eb5b4979e3dcc2410b8a3a15601e6213
[17:55:28 INF] Processed the incoming event with id = eb453915871d48b591be3a15601e621c
[17:55:28 INF] Processed the incoming event with id = 209b147da307591668e53a15601e6226
[17:55:28 INF] Processed the incoming event with id = e5e2ebf385a38620f7ba3a15601e6232
[17:55:28 INF] Processed the incoming event with id = c3d60576f5a436cb2e613a15601e623d
The likelyhood of this issue to occur is higher the more of messages are still waiting in the RabbitMQ queue is at that moment (i.e. after 2048 PingEtos have just been published by the demo app).
The breakpoints above are being hit because at least one EventHandler observes the same Eto (with the same SequenceNumber) twice.
Expected behavior
PingEventHandler and PongEventHandler should never process the same Eto twice.
Actual behavior
PingEventHandler or PongEventHandler observe the same Eto (having the same SequenceNumber) twice.
Regression?
No response
Known Workarounds
No response
Version
8.3.1
User Interface
Common (Default)
Database Provider
EF Core (Default)
Tiered or separate authentication server
None (Default)
Operation System
Windows (Default)
Other information
No response
I have added some additional logging - especially to the DbContextEventInbox used by ABP. In the log below you can see that the same messageId gets inserted twice after force-closing the rabbitMq connection:
2024-10-03 13:08:47.138 +02:00 [DBG] DbContextEventInbox.EnqueueAsync(incomingEvent.MessageId: bb52235768a2648fa3743a15643d29e4) (threadId: 18)
2024-10-03 13:08:47.138 +02:00 [INF] Processed the incoming event with id = ad54f89c28ad512e21753a15643e3ee1
2024-10-03 13:08:47.138 +02:00 [DBG] Received PingEto { SequenceNumber: 5183, PackageIndex: 1089, PackageSize: 2048 }
2024-10-03 13:08:47.141 +02:00 [DBG] DbContextEventInbox.ExistsByMessageIdAsync(messageId: bb52235768a2648fa3743a15643d29e4) -> False (threadId: 5)
2024-10-03 13:08:47.145 +02:00 [DBG] DbContextEventInbox.EnqueueAsync(incomingEvent.MessageId: bb52235768a2648fa3743a15643d29e4) (threadId: 19)
It seems that DistributedEventBusBase.AddToInbox() does insert items in a thread safe way:
The calls to eventInbox.ExistsByMessageIdAsync() and eventInbox.EnqueueAsync() do not happen in a synchronized way so its possible for two threads to "see" no matching entry having that messageId and then both threads decide to insert a new entry:
See https://github.com/abpframework/abp/blob/6a810dbed50f8b8d9a8e0914f56825ac6922ef20/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs#L173 and https://github.com/abpframework/abp/blob/6a810dbed50f8b8d9a8e0914f56825ac6922ef20/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs#L187
I think the AutoRecovering of the RabbitMQ connection is receiving the redelivered messages in a new thread while the thread of the old connection is still busy inserting messages into the inbox.
hi @a-herbst
Sorry for the delay.
I think the AutoRecovering of the RabbitMQ connection is receiving the redelivered messages in a new thread while the thread of the old connection is still busy inserting messages into the inbox.
We will confirm that.
Can you try locking the ProcessEventAsync method by copying the whole source code of RabbitMqDistributedEventBus?
[Dependency(ReplaceServices = true)]
[ExposeServices(typeof(IDistributedEventBus), typeof(MyRabbitMqDistributedEventBus))]
public class MyRabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDependency
{
// ...
private async Task ProcessEventAsync(IModel channel, BasicDeliverEventArgs ea)
{
lock (this)
{
var eventName = ea.RoutingKey;
var eventType = EventTypes.GetOrDefault(eventName);
if (eventType == null)
{
return;
}
var eventData = Serializer.Deserialize(ea.Body.ToArray(), eventType);
var correlationId = ea.BasicProperties.CorrelationId;
if (await AddToInboxAsync(ea.BasicProperties.MessageId, eventName, eventType, eventData, correlationId))
{
return;
}
using (CorrelationIdProvider.Change(correlationId))
{
await TriggerHandlersDirectAsync(eventType, eventData);
}
}
}
// ...
}
When I copy the whole source code of the class mentioned I get build errors due to missing IRabbitMqDistributedEventBus interface.
- I had to copy the source code of the following abp modules to make my test project compile
- Volo.Abp.RabbitMQ
- Volo.Abp.EventBus.RabbitMQ
- I Replaced the ProcessEventAsync method with the code suggested by you
- I had to replace the
awaitstatements in your code withTask<>.ResultandTask.Wait()since await cannot be used inside of lock blocks.
With this fix applied I can't reproduce the issue anymoe. If you like to take a look I have uploaded a branch containing the fix applied to the test project: https://github.com/a-herbst/AbpIssueInboxPatternNotPreventingDuplication/tree/fix/synchronize-ProcessEventAsync