PostgreSQL connection issues when using in combination with RabbitMQ transport
Describe the bug
Description
I noticed that execution of my saga handlers is constantly immediately retried under normal circumstances (no load at all) when using PostgreSQL persistence. After detailed examination, I concluded that connection gets unexpectedly disposed or closed. I have tried to fiddle with the connection settings and pooling, but to no avail. Switching to MySQL works with no issues, so I suspect Npgsql.
Expected behavior
Saga handlers should execute with no immediate retries under normal circumstances.
Actual behavior
Handlers are almost always retried due to disposed or closed connection.
Versions
- NServiceBus 8.1.1
- NServiceBus.Persistence.Sql 7.0.2
- Npgsql 7.0.4
- PosrgreSQL 15.2 and 15.3
Steps to reproduce
Here is the solution to reproduce the issue: https://github.com/zeko77/PostgresSagaIssue
Relevant log output
I have disabled retires in order to log the failure.
fail: NServiceBus.MoveToError[0]
Moving message 'e6f76034-73c1-4a4f-af5a-b03000ed68a3' to the error queue 'error' because processing failed due to an exception:
System.ObjectDisposedException: NpgsqlTransaction
at Npgsql.NpgsqlTransaction.CheckDisposed()
at Npgsql.NpgsqlTransaction.CheckReady()
at Npgsql.NpgsqlTransaction.Commit(Boolean async, CancellationToken cancellationToken)
at StorageSession.CompleteAsync(CancellationToken cancellationToken) in /_/src/SqlPersistence/SynchronizedStorage/StorageSession.cs:line 97
at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/LoadHandlersConnector.cs:line 49
at NServiceBus.InvokeSagaNotFoundBehavior.Invoke(IIncomingLogicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Sagas/InvokeSagaNotFoundBehavior.cs:line 17
at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/DeserializeMessageConnector.cs:line 32
at NServiceBus.InvokeAuditPipelineBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Audit/InvokeAuditPipelineBehavior.cs:line 19
at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Performance/Statistics/ProcessingStatisticsBehavior.cs:line 25
at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs:line 35
at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/ServicePlatform/Retries/RetryAcknowledgementBehavior.cs:line 25
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 49
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 68
at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken) in /_/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs:line 402
Additional Information
No response
@zeko77 Our last release was tested ok: https://github.com/Particular/NServiceBus.Persistence.Sql/actions/runs/5312223106
It was also using Npgsql 7.0.4 but based on the CI workflow tested against postgres:14.
I'll run your repro Tomorrow to see that happens on my workstation.
@zeko77 I tested your repro and https://github.com/Particular/docs.particular.net/blob/master/samples/sql-persistence/simple/SqlPersistence_7/
Our sample works fine with Postgres 14 and 15 but yours is using RabbitMQ transport and our sample the learning transport.
After converting your repro to learning transport it works.
Its seems we have an incompatibility between RabbitMQ and the Postgres dialect.
I got the following various different errors. It seems as if the lifetime for various objects are incorrectly managed. Its as if registrations of certain types either are not registered as transient or as if the lifetime scope isn't correctly defined resulting in re-use of
System.InvalidOperationException: The reader is closed
System.InvalidOperationException: The reader is closed
at Npgsql.NpgsqlDataReader.GetFieldValueSequential[T](Int32 column, Boolean async, CancellationToken cancellationToken)
at SagaPersister.GetSagaData[TSagaData](ISynchronizedStorageSession session, String commandText, RuntimeSagaInfo sagaInfo, ParameterAppender appendParameters, CancellationToken cancellationToken) in /_/src/SqlPersistence/Saga/SagaPersister_Get.cs:line 92
at SagaPersister.Get[TSagaData](String propertyName, Object propertyValue, ISynchronizedStorageSession session, ContextBag context, CancellationToken cancellationToken) in /_/src/SqlPersistence/Saga/SagaPersister_Get.cs:line 31
at NServiceBus.PropertySagaFinder`1.Find(IServiceProvider builder, SagaFinderDefinition finderDefinition, ISynchronizedStorageSession storageSession, ContextBag context, Object message, IReadOnlyDictionary`2 messageHeaders, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Sagas/PropertySagaFinder.cs:line 42
at NServiceBus.SagaPersistenceBehavior.Invoke(IInvokeHandlerContext context, Func`2 next) in /_/src/NServiceBus.Core/Sagas/SagaPersistenceBehavior.cs:line 77
at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/LoadHandlersConnector.cs:line 40
at NServiceBus.InvokeSagaNotFoundBehavior.Invoke(IIncomingLogicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Sagas/InvokeSagaNotFoundBehavior.cs:line 17
at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/DeserializeMessageConnector.cs:line 32
at NServiceBus.InvokeAuditPipelineBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Audit/InvokeAuditPipelineBehavior.cs:line 19
at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Performance/Statistics/ProcessingStatisticsBehavior.cs:line 25
at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs:line 35
at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/ServicePlatform/Retries/RetryAcknowledgementBehavior.cs:line 25
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 49
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 68
at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken) in /_/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs:line 402
Npgsql.NpgsqlOperationInProgressException (0x80004005): The connection is already in state 'Executing'
Npgsql.NpgsqlOperationInProgressException (0x80004005): The connection is already in state 'Executing'
at Npgsql.Internal.NpgsqlConnector.<StartUserAction>g__DoStartUserAction|279_0(ConnectorState newState, NpgsqlCommand command, <>c__DisplayClass279_0&)
at Npgsql.Internal.NpgsqlConnector.StartUserAction(ConnectorState newState, NpgsqlCommand command, CancellationToken cancellationToken, Boolean attemptPgCancellation)
at Npgsql.Internal.NpgsqlConnector.StartUserAction(CancellationToken cancellationToken, Boolean attemptPgCancellation)
at Npgsql.NpgsqlTransaction.Commit(Boolean async, CancellationToken cancellationToken)
at StorageSession.CompleteAsync(CancellationToken cancellationToken) in /_/src/SqlPersistence/SynchronizedStorage/StorageSession.cs:line 97
at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/LoadHandlersConnector.cs:line 49
at NServiceBus.InvokeSagaNotFoundBehavior.Invoke(IIncomingLogicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Sagas/InvokeSagaNotFoundBehavior.cs:line 17
at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/DeserializeMessageConnector.cs:line 32
at NServiceBus.InvokeAuditPipelineBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Audit/InvokeAuditPipelineBehavior.cs:line 19
at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Performance/Statistics/ProcessingStatisticsBehavior.cs:line 25
at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs:line 35
at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/ServicePlatform/Retries/RetryAcknowledgementBehavior.cs:line 25
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 49
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 68
at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken) in /_/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs:line 402
Npgsql.NpgsqlOperationInProgressException (0x80004005): A command is already in progress:
Npgsql.NpgsqlOperationInProgressException (0x80004005): A command is already in progress:
at Npgsql.Internal.NpgsqlConnector.<StartUserAction>g__DoStartUserAction|279_0(ConnectorState newState, NpgsqlCommand command, <>c__DisplayClass279_0&)
at Npgsql.Internal.NpgsqlConnector.StartUserAction(ConnectorState newState, NpgsqlCommand command, CancellationToken cancellationToken, Boolean attemptPgCancellation)
at Npgsql.Internal.NpgsqlConnector.StartUserAction(CancellationToken cancellationToken, Boolean attemptPgCancellation)
at Npgsql.Internal.NpgsqlConnector.Reset(Boolean async)
at Npgsql.NpgsqlConnection.CloseAsync(Boolean async)
at Npgsql.NpgsqlConnection.Close()
at Npgsql.NpgsqlConnection.Dispose(Boolean disposing)
at System.ComponentModel.Component.Dispose()
at StorageSession.Dispose() in /_/src/SqlPersistence/SynchronizedStorage/StorageSession.cs:line 114
at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.DisposeAsync()
--- End of stack trace from previous location ---
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 68
at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken) in /_/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs:line 402
System.ObjectDisposedException: NpgsqlTransaction
System.ObjectDisposedException: NpgsqlTransaction
at Npgsql.NpgsqlTransaction.CheckDisposed()
at Npgsql.NpgsqlTransaction.CheckReady()
at Npgsql.NpgsqlTransaction.Commit(Boolean async, CancellationToken cancellationToken)
at StorageSession.CompleteAsync(CancellationToken cancellationToken) in /_/src/SqlPersistence/SynchronizedStorage/StorageSession.cs:line 97
at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/LoadHandlersConnector.cs:line 49
at NServiceBus.InvokeSagaNotFoundBehavior.Invoke(IIncomingLogicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Sagas/InvokeSagaNotFoundBehavior.cs:line 17
at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/DeserializeMessageConnector.cs:line 32
at NServiceBus.InvokeAuditPipelineBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Audit/InvokeAuditPipelineBehavior.cs:line 19
at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Performance/Statistics/ProcessingStatisticsBehavior.cs:line 25
at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs:line 35
at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/ServicePlatform/Retries/RetryAcknowledgementBehavior.cs:line 25
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 49
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 68
at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken) in /_/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs:line 402
Npgsql.NpgsqlException (0x80004005): Exception while reading from stream
Npgsql.NpgsqlException (0x80004005): Exception while reading from stream
---> System.IO.IOException: Unable to read data from the transport connection: An established connection was aborted by the software in your host machine..
---> System.Net.Sockets.SocketException (10053): An established connection was aborted by the software in your host machine.
at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 count)
--- End of inner exception stack trace ---
at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 count)
at Npgsql.Internal.NpgsqlReadBuffer.<Ensure>g__EnsureLong|42_0(NpgsqlReadBuffer buffer, Int32 count, Boolean async, Boolean readingNotifications)
at Npgsql.Internal.NpgsqlReadBuffer.<Ensure>g__EnsureLong|42_0(NpgsqlReadBuffer buffer, Int32 count, Boolean async, Boolean readingNotifications)
at Npgsql.Internal.NpgsqlReadBuffer.Skip(Int64 len, Boolean async)
at Npgsql.Internal.NpgsqlReadBuffer.ColumnStream.DisposeAsync(Boolean disposing, Boolean async)
at Npgsql.Internal.NpgsqlReadBuffer.ColumnStream.Dispose(Boolean disposing)
at System.IO.Stream.Close()
at Npgsql.PreparedTextReader.Dispose(Boolean disposing)
at System.IO.TextReader.Close()
at Newtonsoft.Json.JsonTextReader.Close()
at Newtonsoft.Json.JsonReader.Dispose(Boolean disposing)
at Newtonsoft.Json.JsonReader.System.IDisposable.Dispose()
at Serializer.Deserialize[T](TextReader reader) in /_/src/SqlPersistence/Serializer.cs:line 36
at SagaPersister.ReadMetadata(DbDataReader dataReader, String& originator, String& originalMessageId) in /_/src/SqlPersistence/Saga/SagaPersister_Get.cs:line 112
at SagaPersister.GetSagaData[TSagaData](ISynchronizedStorageSession session, String commandText, RuntimeSagaInfo sagaInfo, ParameterAppender appendParameters, CancellationToken cancellationToken) in /_/src/SqlPersistence/Saga/SagaPersister_Get.cs:line 95
at SagaPersister.Get[TSagaData](String propertyName, Object propertyValue, ISynchronizedStorageSession session, ContextBag context, CancellationToken cancellationToken) in /_/src/SqlPersistence/Saga/SagaPersister_Get.cs:line 31
at NServiceBus.PropertySagaFinder`1.Find(IServiceProvider builder, SagaFinderDefinition finderDefinition, ISynchronizedStorageSession storageSession, ContextBag context, Object message, IReadOnlyDictionary`2 messageHeaders, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Sagas/PropertySagaFinder.cs:line 42
at NServiceBus.SagaPersistenceBehavior.Invoke(IInvokeHandlerContext context, Func`2 next) in /_/src/NServiceBus.Core/Sagas/SagaPersistenceBehavior.cs:line 154
at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/LoadHandlersConnector.cs:line 50
at NServiceBus.InvokeSagaNotFoundBehavior.Invoke(IIncomingLogicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Sagas/InvokeSagaNotFoundBehavior.cs:line 17
at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage) in /_/src/NServiceBus.Core/Pipeline/Incoming/DeserializeMessageConnector.cs:line 32
at NServiceBus.InvokeAuditPipelineBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Audit/InvokeAuditPipelineBehavior.cs:line 19
at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) in /_/src/NServiceBus.Core/Performance/Statistics/ProcessingStatisticsBehavior.cs:line 25
at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs:line 68
at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/ServicePlatform/Retries/RetryAcknowledgementBehavior.cs:line 25
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 49
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 68
at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken) in /_/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs:line 446
@zeko77 Can you share how you are currently affected and also if you are a licensed customer? If you don't want to disclose this publicly then please provide these details via [email protected] and please mention this issue url and my name.
Thanks for prompt response. I too am getting all of the exceptions that you mentioned. I am a licensed customer, but this issue occurs on a new project that is in still development phase. I accidentally took a peek at logs and discovered this. Since this is resolved on a first retry, it went under radar for quite some time.
@zeko77 We've prioritized your issue near the top of our list and will work on it as soon as we can.