abp icon indicating copy to clipboard operation
abp copied to clipboard

InboxPattern delivers the same Etos twice when the RabbitMQ connection is interrupted

Open a-herbst opened this issue 1 year ago • 1 comments

Is there an existing issue for this?

  • [X] I have searched the existing issues

Description

I have created a sample application to demonstrate the issue. See AbpIssueInboxPatternNotPreventingDuplication

The application has

  • InboxPattern
  • OutboxPattern
  • a DistributedLockProvider for SQL set up. It sends an exponentially increasing number of PingEto and PongEtos to generate some artifical traffic on the distributed event bus. If you keep it running it will exit itself gracefully after receiving 2048 PongEto messages.

Reproduction of the issue - see below.

It can happen that the restored RabbitMQ connection will cause AMQP message to get redelivered that already been received before the connection was lost. The use of the inbox pattern should prevent those events from being processed twice by an EventHandler - but somehow it does not.

Is this a bug or am I missing anything in how the InboxPattern should be setup correctly?

Reproduction Steps

  • checkout AbpIssueInboxPatternNotPreventingDuplication app
  • have a RabbitMQ installation running at localhost using the default credentials (guest, guest) - or adapt the appsettings.json accordingly
  • run "dotnet run --migrate-database" from within the "AbpIssue" project folder inside the solution
  • place breakpoints in class AbpIssue.EventHandler.AbstractEventHandler in method SequenceNumberDuplicationCheck()
    • in line 28
    • in line 36
  • run the app with launch settings "AbpIssue" from VisualStudio
  • "force close" the RabbitMQ connection from the management webinterface of RabbitMq right while the console says the app is processing events from the inbox.
[17:55:28 INF] Processed the incoming event with id = 4ebfb8d044761169c4ff3a15601e61f7
[17:55:28 INF] Processed the incoming event with id = f550a456ee8ee9c0f35b3a15601e6200
[17:55:28 INF] Processed the incoming event with id = 915e68fc3002dad04de13a15601e620a
[17:55:28 INF] Processed the incoming event with id = eb5b4979e3dcc2410b8a3a15601e6213
[17:55:28 INF] Processed the incoming event with id = eb453915871d48b591be3a15601e621c
[17:55:28 INF] Processed the incoming event with id = 209b147da307591668e53a15601e6226
[17:55:28 INF] Processed the incoming event with id = e5e2ebf385a38620f7ba3a15601e6232
[17:55:28 INF] Processed the incoming event with id = c3d60576f5a436cb2e613a15601e623d

The likelyhood of this issue to occur is higher the more of messages are still waiting in the RabbitMQ queue is at that moment (i.e. after 2048 PingEtos have just been published by the demo app).

The breakpoints above are being hit because at least one EventHandler observes the same Eto (with the same SequenceNumber) twice.

Expected behavior

PingEventHandler and PongEventHandler should never process the same Eto twice.

Actual behavior

PingEventHandler or PongEventHandler observe the same Eto (having the same SequenceNumber) twice.

Regression?

No response

Known Workarounds

No response

Version

8.3.1

User Interface

Common (Default)

Database Provider

EF Core (Default)

Tiered or separate authentication server

None (Default)

Operation System

Windows (Default)

Other information

No response

a-herbst avatar Oct 02 '24 16:10 a-herbst

I have added some additional logging - especially to the DbContextEventInbox used by ABP. In the log below you can see that the same messageId gets inserted twice after force-closing the rabbitMq connection:

2024-10-03 13:08:47.138 +02:00 [DBG] DbContextEventInbox.EnqueueAsync(incomingEvent.MessageId: bb52235768a2648fa3743a15643d29e4) (threadId: 18)
2024-10-03 13:08:47.138 +02:00 [INF] Processed the incoming event with id = ad54f89c28ad512e21753a15643e3ee1
2024-10-03 13:08:47.138 +02:00 [DBG] Received PingEto { SequenceNumber: 5183, PackageIndex: 1089, PackageSize: 2048 }
2024-10-03 13:08:47.141 +02:00 [DBG] DbContextEventInbox.ExistsByMessageIdAsync(messageId: bb52235768a2648fa3743a15643d29e4) -> False (threadId: 5)
2024-10-03 13:08:47.145 +02:00 [DBG] DbContextEventInbox.EnqueueAsync(incomingEvent.MessageId: bb52235768a2648fa3743a15643d29e4) (threadId: 19)

logs.txt

It seems that DistributedEventBusBase.AddToInbox() does insert items in a thread safe way:

The calls to eventInbox.ExistsByMessageIdAsync() and eventInbox.EnqueueAsync() do not happen in a synchronized way so its possible for two threads to "see" no matching entry having that messageId and then both threads decide to insert a new entry:

See https://github.com/abpframework/abp/blob/6a810dbed50f8b8d9a8e0914f56825ac6922ef20/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs#L173 and https://github.com/abpframework/abp/blob/6a810dbed50f8b8d9a8e0914f56825ac6922ef20/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs#L187

I think the AutoRecovering of the RabbitMQ connection is receiving the redelivered messages in a new thread while the thread of the old connection is still busy inserting messages into the inbox.

a-herbst avatar Oct 04 '24 06:10 a-herbst

hi @a-herbst

Sorry for the delay.

I think the AutoRecovering of the RabbitMQ connection is receiving the redelivered messages in a new thread while the thread of the old connection is still busy inserting messages into the inbox.

We will confirm that.

Can you try locking the ProcessEventAsync method by copying the whole source code of RabbitMqDistributedEventBus?

[Dependency(ReplaceServices = true)]
[ExposeServices(typeof(IDistributedEventBus), typeof(MyRabbitMqDistributedEventBus))]
public class MyRabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDependency
{
    // ...

    private async Task ProcessEventAsync(IModel channel, BasicDeliverEventArgs ea)
    {
        lock (this)
        {
          var eventName = ea.RoutingKey;
          var eventType = EventTypes.GetOrDefault(eventName);
          if (eventType == null)
          {
              return;
          }
  
          var eventData = Serializer.Deserialize(ea.Body.ToArray(), eventType);
  
          var correlationId = ea.BasicProperties.CorrelationId;
          if (await AddToInboxAsync(ea.BasicProperties.MessageId, eventName, eventType, eventData, correlationId))
          {
              return;
          }
  
          using (CorrelationIdProvider.Change(correlationId))
          {
              await TriggerHandlersDirectAsync(eventType, eventData);
          }
        }
    }

    // ...
}

maliming avatar Dec 24 '24 09:12 maliming

When I copy the whole source code of the class mentioned I get build errors due to missing IRabbitMqDistributedEventBus interface.

a-herbst avatar Jan 16 '25 08:01 a-herbst

  1. I had to copy the source code of the following abp modules to make my test project compile
    • Volo.Abp.RabbitMQ
    • Volo.Abp.EventBus.RabbitMQ
  2. I Replaced the ProcessEventAsync method with the code suggested by you
  3. I had to replace the await statements in your code with Task<>.Result and Task.Wait() since await cannot be used inside of lock blocks.

With this fix applied I can't reproduce the issue anymoe. If you like to take a look I have uploaded a branch containing the fix applied to the test project: https://github.com/a-herbst/AbpIssueInboxPatternNotPreventingDuplication/tree/fix/synchronize-ProcessEventAsync

a-herbst avatar Jan 16 '25 09:01 a-herbst