NServiceBus icon indicating copy to clipboard operation
NServiceBus copied to clipboard

Duplicate OrderId index in Saga tutorial

Open Vinko90 opened this issue 3 years ago • 2 comments
trafficstars

Hey guys,

We are learning NServiceBus in our organization since we would like to start using it in production. We currently completed the tutorial on Sagas (This one in specific: https://docs.particular.net/tutorials/nservicebus-sagas/2-timeouts ), however there is an issue when we replace the LearningTransport with MongoDB (Running in Docker).

The issue is that apparently there is a duplicate "OrderId_1" in ShippingPolicyData but we have no idea where this is coming from. We double check and our code is exactly the same as the tutorial code and all message objects only have one OrderId has a property.

Output:

2022-05-20 18:11:59.679 INFO  Auditing processed messages to 'audit'
2022-05-20 18:11:59.693 INFO  No valid license could be found. Falling back to trial license with start date '2022-05-12'.
Press Enter to exit.
2022-05-20 18:12:27.763 INFO  [Order: 45138491-1820-479b-a922-98a4d7f21138] Received OrderPlaced
2022-05-20 18:12:27.763 INFO  [Order: 45138491-1820-479b-a922-98a4d7f21138] Received OrderBilled
2022-05-20 18:12:27.867 INFO  Immediate Retry is going to retry message '345af3b7-5277-4c53-92af-ae9b010b1838' because of an exception:
MongoDB.Driver.MongoWriteException: A write operation resulted in an error.
E11000 duplicate key error collection: Shipping.shippingpolicydata index: OrderId_1 dup key: { OrderId: "45138491-1820-479b-a922-98a4d7f21138" }
---> MongoDB.Driver.MongoBulkWriteException`1[MongoDB.Bson.BsonDocument]: A bulk write operation resulted in one or more errors.
E11000 duplicate key error collection: Shipping.shippingpolicydata index: OrderId_1 dup key: { OrderId: "45138491-1820-479b-a922-98a4d7f21138" }
 at MongoDB.Driver.MongoCollectionImpl`1.BulkWriteAsync(IClientSessionHandle session, IEnumerable`1 requests, BulkWriteOptions options, CancellationToken cancellationToken)
 at MongoDB.Driver.MongoCollectionBase`1.InsertOneAsync(TDocument document, InsertOneOptions options, Func`3 bulkWriteAsync)
 --- End of inner exception stack trace ---
 at MongoDB.Driver.MongoCollectionBase`1.InsertOneAsync(TDocument document, InsertOneOptions options, Func`3 bulkWriteAsync)
 at NServiceBus.Storage.MongoDB.SagaPersister.Save(IContainSagaData sagaData, SagaCorrelationProperty correlationProperty, SynchronizedStorageSession session, ContextBag context)
 at NServiceBus.SagaPersistenceBehavior.Invoke(IInvokeHandlerContext context, Func`2 next) in /_/src/NServiceBus.Core/Sagas/SagaPersistenceBehavior.cs:line 155
 at NServiceBus.SagaAudit.CaptureSagaStateBehavior.Invoke(IInvokeHandlerContext context, Func`1 next)
 at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/LoadHandlersConnector.cs:line 48
 at NServiceBus.ScheduledTaskHandlingBehavior.Invoke(IIncomingLogicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Scheduling/ScheduledTaskHandlingBehavior.cs:line 22
 at NServiceBus.InvokeSagaNotFoundBehavior.Invoke(IIncomingLogicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Sagas/InvokeSagaNotFoundBehavior.cs:line 16
 at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/DeserializeMessageConnector.cs:line 34
 at ReceivePerformanceDiagnosticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next)
 at NServiceBus.InvokeAuditPipelineBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Audit/InvokeAuditPipelineBehavior.cs:line 18
 at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Performance/Statistics/ProcessingStatisticsBehavior.cs:line 25
 at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs:line 37
 at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/ServicePlatform/Retries/RetryAcknowledgementBehavior.cs:line 25
 at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 35
 at NServiceBus.TransportReceiver.InvokePipeline(MessageContext c) in /_/src/NServiceBus.Core/Transports/TransportReceiver.cs:line 58
 at NServiceBus.TransportReceiver.InvokePipeline(MessageContext c) in /_/src/NServiceBus.Core/Transports/TransportReceiver.cs:line 64
 at NServiceBus.Transport.RabbitMQ.MessagePump.Process(EventingBasicConsumer consumer, BasicDeliverEventArgs message, Byte[] messageBody) in /_/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs:line 368
Exception details:
      Message ID: 345af3b7-5277-4c53-92af-ae9b010b1838
2022-05-20 18:12:27.943 INFO  [Order: 45138491-1820-479b-a922-98a4d7f21138] Received OrderBilled
2022-05-20 18:12:28.017 INFO  [Order: 45138491-1820-479b-a922-98a4d7f21138] Order successfully shipped.

Our Program.cs:


    static async Task Main()
    { 
        Console.Title = "Sales";

        //Set service endpoint name
        var endpointConfiguration = new EndpointConfiguration("Sales");

        //Automatically create queues if not exist
        endpointConfiguration.EnableInstallers();

        //Use RabbitMQ as transport
        var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
        transport.UseConventionalRoutingTopology();
        transport.ConnectionString(ConnectionString.RabbitMQConnectionString);

        //Add Saga persistence to MongoDB
        var persistence = endpointConfiguration.UsePersistence<MongoPersistence>();
        persistence.MongoClient(new MongoClient(ConnectionString.MongoDBConnectionString));
        persistence.DatabaseName(""SalesDB"");
        persistence.UseTransactions(false);

        //Configure monitoring system
        var json = File.ReadAllText("ServicePulseConfig.json");
        var servicePlatformConnection = ServicePlatformConnectionConfiguration.Parse(json);
        endpointConfiguration.ConnectToServicePlatform(servicePlatformConnection);

        //Configure retries
        //I will disable immadiate and delayed retry to demo service pulse
        var recoverability = endpointConfiguration.Recoverability();
        recoverability.Immediate(immediate => immediate.NumberOfRetries(0));
        recoverability.Delayed(delayed => delayed.NumberOfRetries(0));

        //Start bus
        var endpointInstance = await Endpoint.Start(endpointConfiguration).ConfigureAwait(false);

        Console.WriteLine("Press Enter to exit.");
        Console.ReadLine();

        //Stop bus
        await endpointInstance.Stop().ConfigureAwait(false);
    }

What we would like to understand also is, how many databases we need to use for sagas? One for every endpoint or one db for all the endopoints?

We tried running the demo with two separate db's (SalesDB, ShippingDB), and one for both (DemoDB) and the problem with duplicate index is still the same.

Vinko90 avatar May 20 '22 16:05 Vinko90

@Vinko90 I tried to reproduce this but didn't run into the problem you are reporting.

The only thing I can think of is that RabbitMQ is not transactional. If you are not running outbox and there are certain edge case errors you can get ghost messages which report as duplicates. Perhaps that is what is going on here?

boblangley avatar Jun 06 '22 21:06 boblangley

Hey @boblangley , thank you very much for testing this, I have uploaded my code as well here: https://github.com/Vinko90/NServiceBusDemo

I wanted to let you know that if we use MSSQL instead of MongoDB, everything works great and no duplicate message fields are reported by NServiceBus. Also, I see you are running MongoDB container with this parameter "--replSet tr0", I will try running like this as well, maybe that's the issue :)

Just for confirmation, are there any "Installers" for creating ServiceControl queue at runtime? (If not exist, of course). The reason why I am asking this question is because I've noticed that some of our developers run the RabbitMQ container for testing without a persistence volume, that means on every run they need to uninstall/re-install ServiceControl, otherwise the application won't start since it's missing all the NServiceBus endpoint in RabbitMQ.

Thank you :)

Vinko90 avatar Jun 08 '22 19:06 Vinko90