NServiceBus
NServiceBus copied to clipboard
Duplicate OrderId index in Saga tutorial
Hey guys,
We are learning NServiceBus in our organization since we would like to start using it in production. We currently completed the tutorial on Sagas (This one in specific: https://docs.particular.net/tutorials/nservicebus-sagas/2-timeouts ), however there is an issue when we replace the LearningTransport with MongoDB (Running in Docker).
The issue is that apparently there is a duplicate "OrderId_1" in ShippingPolicyData but we have no idea where this is coming from. We double check and our code is exactly the same as the tutorial code and all message objects only have one OrderId has a property.
Output:
2022-05-20 18:11:59.679 INFO Auditing processed messages to 'audit'
2022-05-20 18:11:59.693 INFO No valid license could be found. Falling back to trial license with start date '2022-05-12'.
Press Enter to exit.
2022-05-20 18:12:27.763 INFO [Order: 45138491-1820-479b-a922-98a4d7f21138] Received OrderPlaced
2022-05-20 18:12:27.763 INFO [Order: 45138491-1820-479b-a922-98a4d7f21138] Received OrderBilled
2022-05-20 18:12:27.867 INFO Immediate Retry is going to retry message '345af3b7-5277-4c53-92af-ae9b010b1838' because of an exception:
MongoDB.Driver.MongoWriteException: A write operation resulted in an error.
E11000 duplicate key error collection: Shipping.shippingpolicydata index: OrderId_1 dup key: { OrderId: "45138491-1820-479b-a922-98a4d7f21138" }
---> MongoDB.Driver.MongoBulkWriteException`1[MongoDB.Bson.BsonDocument]: A bulk write operation resulted in one or more errors.
E11000 duplicate key error collection: Shipping.shippingpolicydata index: OrderId_1 dup key: { OrderId: "45138491-1820-479b-a922-98a4d7f21138" }
at MongoDB.Driver.MongoCollectionImpl`1.BulkWriteAsync(IClientSessionHandle session, IEnumerable`1 requests, BulkWriteOptions options, CancellationToken cancellationToken)
at MongoDB.Driver.MongoCollectionBase`1.InsertOneAsync(TDocument document, InsertOneOptions options, Func`3 bulkWriteAsync)
--- End of inner exception stack trace ---
at MongoDB.Driver.MongoCollectionBase`1.InsertOneAsync(TDocument document, InsertOneOptions options, Func`3 bulkWriteAsync)
at NServiceBus.Storage.MongoDB.SagaPersister.Save(IContainSagaData sagaData, SagaCorrelationProperty correlationProperty, SynchronizedStorageSession session, ContextBag context)
at NServiceBus.SagaPersistenceBehavior.Invoke(IInvokeHandlerContext context, Func`2 next) in /_/src/NServiceBus.Core/Sagas/SagaPersistenceBehavior.cs:line 155
at NServiceBus.SagaAudit.CaptureSagaStateBehavior.Invoke(IInvokeHandlerContext context, Func`1 next)
at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/LoadHandlersConnector.cs:line 48
at NServiceBus.ScheduledTaskHandlingBehavior.Invoke(IIncomingLogicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Scheduling/ScheduledTaskHandlingBehavior.cs:line 22
at NServiceBus.InvokeSagaNotFoundBehavior.Invoke(IIncomingLogicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Sagas/InvokeSagaNotFoundBehavior.cs:line 16
at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/DeserializeMessageConnector.cs:line 34
at ReceivePerformanceDiagnosticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next)
at NServiceBus.InvokeAuditPipelineBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Audit/InvokeAuditPipelineBehavior.cs:line 18
at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Performance/Statistics/ProcessingStatisticsBehavior.cs:line 25
at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs:line 37
at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/ServicePlatform/Retries/RetryAcknowledgementBehavior.cs:line 25
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 35
at NServiceBus.TransportReceiver.InvokePipeline(MessageContext c) in /_/src/NServiceBus.Core/Transports/TransportReceiver.cs:line 58
at NServiceBus.TransportReceiver.InvokePipeline(MessageContext c) in /_/src/NServiceBus.Core/Transports/TransportReceiver.cs:line 64
at NServiceBus.Transport.RabbitMQ.MessagePump.Process(EventingBasicConsumer consumer, BasicDeliverEventArgs message, Byte[] messageBody) in /_/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs:line 368
Exception details:
Message ID: 345af3b7-5277-4c53-92af-ae9b010b1838
2022-05-20 18:12:27.943 INFO [Order: 45138491-1820-479b-a922-98a4d7f21138] Received OrderBilled
2022-05-20 18:12:28.017 INFO [Order: 45138491-1820-479b-a922-98a4d7f21138] Order successfully shipped.
Our Program.cs:
static async Task Main()
{
Console.Title = "Sales";
//Set service endpoint name
var endpointConfiguration = new EndpointConfiguration("Sales");
//Automatically create queues if not exist
endpointConfiguration.EnableInstallers();
//Use RabbitMQ as transport
var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.UseConventionalRoutingTopology();
transport.ConnectionString(ConnectionString.RabbitMQConnectionString);
//Add Saga persistence to MongoDB
var persistence = endpointConfiguration.UsePersistence<MongoPersistence>();
persistence.MongoClient(new MongoClient(ConnectionString.MongoDBConnectionString));
persistence.DatabaseName(""SalesDB"");
persistence.UseTransactions(false);
//Configure monitoring system
var json = File.ReadAllText("ServicePulseConfig.json");
var servicePlatformConnection = ServicePlatformConnectionConfiguration.Parse(json);
endpointConfiguration.ConnectToServicePlatform(servicePlatformConnection);
//Configure retries
//I will disable immadiate and delayed retry to demo service pulse
var recoverability = endpointConfiguration.Recoverability();
recoverability.Immediate(immediate => immediate.NumberOfRetries(0));
recoverability.Delayed(delayed => delayed.NumberOfRetries(0));
//Start bus
var endpointInstance = await Endpoint.Start(endpointConfiguration).ConfigureAwait(false);
Console.WriteLine("Press Enter to exit.");
Console.ReadLine();
//Stop bus
await endpointInstance.Stop().ConfigureAwait(false);
}
What we would like to understand also is, how many databases we need to use for sagas? One for every endpoint or one db for all the endopoints?
We tried running the demo with two separate db's (SalesDB, ShippingDB), and one for both (DemoDB) and the problem with duplicate index is still the same.
@Vinko90 I tried to reproduce this but didn't run into the problem you are reporting.
The only thing I can think of is that RabbitMQ is not transactional. If you are not running outbox and there are certain edge case errors you can get ghost messages which report as duplicates. Perhaps that is what is going on here?
Hey @boblangley , thank you very much for testing this, I have uploaded my code as well here: https://github.com/Vinko90/NServiceBusDemo
I wanted to let you know that if we use MSSQL instead of MongoDB, everything works great and no duplicate message fields are reported by NServiceBus. Also, I see you are running MongoDB container with this parameter "--replSet tr0", I will try running like this as well, maybe that's the issue :)
Just for confirmation, are there any "Installers" for creating ServiceControl queue at runtime? (If not exist, of course). The reason why I am asking this question is because I've noticed that some of our developers run the RabbitMQ container for testing without a persistence volume, that means on every run they need to uninstall/re-install ServiceControl, otherwise the application won't start since it's missing all the NServiceBus endpoint in RabbitMQ.
Thank you :)