motornet
motornet copied to clipboard
RabbitMQ service stopped consuming, but remained "healthy"
We ran into a case tonight where one of our RabbitMQ services stopped consuming messages without logging any error. This resulted in the queue filling up gradually until we manually restarted the pods from the service which fixed the problem.
Unfortunately, the MessageProcessingHealthCheck
did not help here which was supposed to prevent situations like this because it's only checking whether elements are in the internal queue without anything being processed, but the service did not consume messages from its queue any more so the internal queue stayed completely empty.
It was by the way quite easy to inspect the problem thus far thanks to the Motor.NET and RabbitMQ metrics which is nice 😃
But we don't know yet why the consumer apparently stopped working.
For reference, we're using Motor.NET 0.9.10.
We had the same problem. Our service was unable to publish messages but the healthcheck stayed healthy. In our case the service was printing a lot of errors:
Motor.Extensions.Hosting.Abstractions.TemporaryFailureException: Couldn't publish message
---> RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=541, text='Unexpected Exception', classId=0, methodId=0, cause=System.IO.IOException: Unable to read data from the transport connection: Connection reset by peer.
---> System.Net.Sockets.SocketException (104): Connection reset by peer
at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 count)
--- End of inner exception stack trace ---
at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 count)
at System.IO.BufferedStream.ReadByteSlow()
at RabbitMQ.Client.Impl.InboundFrame.ReadFrom(Stream reader, Byte[] frameHeaderBuffer)
at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()
at RabbitMQ.Client.Framing.Impl.Connection.MainLoop()
at RabbitMQ.Client.Impl.SessionBase.Transmit(OutgoingCommand& cmd)
at RabbitMQ.Client.Framing.Impl.Model._Private_BasicPublish(String exchange, String routingKey, Boolean mandatory, IBasicProperties basicProperties, ReadOnlyMemory`1 body)
at RabbitMQ.Client.Impl.ModelBase.BasicPublish(String exchange, String routingKey, Boolean mandatory, IBasicProperties basicProperties, ReadOnlyMemory`1 body)
at Motor.Extensions.Hosting.RabbitMQ.RabbitMQMessagePublisher`1.PublishMessageAsync(MotorCloudEvent`1 motorCloudEvent, CancellationToken token)
--- End of inner exception stack trace ---
at Motor.Extensions.Hosting.RabbitMQ.RabbitMQMessagePublisher`1.PublishMessageAsync(MotorCloudEvent`1 motorCloudEvent, CancellationToken token)
at Motor.Extensions.Hosting.Publisher.TypedMessagePublisher`2.PublishMessageAsync(MotorCloudEvent`1 motorCloudEvent, CancellationToken token)
at Motor.Extensions.Hosting.MultiOutputServiceAdapter`2.HandleMessageAsync(MotorCloudEvent`1 dataCloudEvent, CancellationToken token)
at Motor.Extensions.Hosting.MultiOutputServiceAdapter`2.HandleMessageAsync(MotorCloudEvent`1 dataCloudEvent, CancellationToken token)
and we had a second exception later due to a memory leak:
Motor.Extensions.Hosting.Abstractions.TemporaryFailureException: Couldn't publish message
---> RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=541, text='Unexpected Exception', classId=0, methodId=0, cause=System.OutOfMemoryException: Exception of type 'System.OutOfMemoryException' was thrown.
at System.Net.Sockets.SocketAsyncContext.ReceiveFrom(Memory`1 buffer, SocketFlags& flags, Byte[] socketAddress, Int32& socketAddressLen, Int32 timeout, Int32& bytesReceived)
at System.Net.Sockets.SocketPal.Receive(SafeSocketHandle handle, Byte[] buffer, Int32 offset, Int32 count, SocketFlags socketFlags, Int32& bytesTransferred)
at System.Net.Sockets.Socket.Receive(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags, SocketError& errorCode)
at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 count)
at System.IO.BufferedStream.ReadByteSlow()
at RabbitMQ.Client.Impl.InboundFrame.ReadFrom(Stream reader, Byte[] frameHeaderBuffer)
at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()
at RabbitMQ.Client.Framing.Impl.Connection.MainLoop()
at RabbitMQ.Client.Impl.SessionBase.Transmit(OutgoingCommand& cmd)
at RabbitMQ.Client.Framing.Impl.Model._Private_BasicPublish(String exchange, String routingKey, Boolean mandatory, IBasicProperties basicProperties, ReadOnlyMemory`1 body)
at RabbitMQ.Client.Impl.ModelBase.BasicPublish(String exchange, String routingKey, Boolean mandatory, IBasicProperties basicProperties, ReadOnlyMemory`1 body)
at Motor.Extensions.Hosting.RabbitMQ.RabbitMQMessagePublisher`1.PublishMessageAsync(MotorCloudEvent`1 motorCloudEvent, CancellationToken token)
--- End of inner exception stack trace ---
at Motor.Extensions.Hosting.RabbitMQ.RabbitMQMessagePublisher`1.PublishMessageAsync(MotorCloudEvent`1 motorCloudEvent, CancellationToken token)
at Motor.Extensions.Hosting.Publisher.TypedMessagePublisher`2.PublishMessageAsync(MotorCloudEvent`1 motorCloudEvent, CancellationToken token)
at Motor.Extensions.Hosting.MultiOutputServiceAdapter`2.HandleMessageAsync(MotorCloudEvent`1 dataCloudEvent, CancellationToken token)
at Motor.Extensions.Hosting.MultiOutputServiceAdapter`2.HandleMessageAsync(MotorCloudEvent`1 dataCloudEvent, CancellationToken token)