motornet icon indicating copy to clipboard operation
motornet copied to clipboard

RabbitMQ service stopped consuming, but remained "healthy"

Open FlorianHockmann opened this issue 2 years ago • 1 comments

We ran into a case tonight where one of our RabbitMQ services stopped consuming messages without logging any error. This resulted in the queue filling up gradually until we manually restarted the pods from the service which fixed the problem.

Unfortunately, the MessageProcessingHealthCheck did not help here which was supposed to prevent situations like this because it's only checking whether elements are in the internal queue without anything being processed, but the service did not consume messages from its queue any more so the internal queue stayed completely empty.

It was by the way quite easy to inspect the problem thus far thanks to the Motor.NET and RabbitMQ metrics which is nice 😃

But we don't know yet why the consumer apparently stopped working.

For reference, we're using Motor.NET 0.9.10.

FlorianHockmann avatar Mar 18 '22 09:03 FlorianHockmann

We had the same problem. Our service was unable to publish messages but the healthcheck stayed healthy. In our case the service was printing a lot of errors:

Motor.Extensions.Hosting.Abstractions.TemporaryFailureException: Couldn't publish message
 ---> RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=541, text='Unexpected Exception', classId=0, methodId=0, cause=System.IO.IOException: Unable to read data from the transport connection: Connection reset by peer.
 ---> System.Net.Sockets.SocketException (104): Connection reset by peer
   at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 count)
   --- End of inner exception stack trace ---
   at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 count)
   at System.IO.BufferedStream.ReadByteSlow()
   at RabbitMQ.Client.Impl.InboundFrame.ReadFrom(Stream reader, Byte[] frameHeaderBuffer)
   at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()
   at RabbitMQ.Client.Framing.Impl.Connection.MainLoop()
   at RabbitMQ.Client.Impl.SessionBase.Transmit(OutgoingCommand& cmd)
   at RabbitMQ.Client.Framing.Impl.Model._Private_BasicPublish(String exchange, String routingKey, Boolean mandatory, IBasicProperties basicProperties, ReadOnlyMemory`1 body)
   at RabbitMQ.Client.Impl.ModelBase.BasicPublish(String exchange, String routingKey, Boolean mandatory, IBasicProperties basicProperties, ReadOnlyMemory`1 body)
   at Motor.Extensions.Hosting.RabbitMQ.RabbitMQMessagePublisher`1.PublishMessageAsync(MotorCloudEvent`1 motorCloudEvent, CancellationToken token)
   --- End of inner exception stack trace ---
   at Motor.Extensions.Hosting.RabbitMQ.RabbitMQMessagePublisher`1.PublishMessageAsync(MotorCloudEvent`1 motorCloudEvent, CancellationToken token)
   at Motor.Extensions.Hosting.Publisher.TypedMessagePublisher`2.PublishMessageAsync(MotorCloudEvent`1 motorCloudEvent, CancellationToken token)
   at Motor.Extensions.Hosting.MultiOutputServiceAdapter`2.HandleMessageAsync(MotorCloudEvent`1 dataCloudEvent, CancellationToken token)
   at Motor.Extensions.Hosting.MultiOutputServiceAdapter`2.HandleMessageAsync(MotorCloudEvent`1 dataCloudEvent, CancellationToken token)

and we had a second exception later due to a memory leak:

Motor.Extensions.Hosting.Abstractions.TemporaryFailureException: Couldn't publish message
 ---> RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=541, text='Unexpected Exception', classId=0, methodId=0, cause=System.OutOfMemoryException: Exception of type 'System.OutOfMemoryException' was thrown.
   at System.Net.Sockets.SocketAsyncContext.ReceiveFrom(Memory`1 buffer, SocketFlags& flags, Byte[] socketAddress, Int32& socketAddressLen, Int32 timeout, Int32& bytesReceived)
   at System.Net.Sockets.SocketPal.Receive(SafeSocketHandle handle, Byte[] buffer, Int32 offset, Int32 count, SocketFlags socketFlags, Int32& bytesTransferred)
   at System.Net.Sockets.Socket.Receive(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags, SocketError& errorCode)
   at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 count)
   at System.IO.BufferedStream.ReadByteSlow()
   at RabbitMQ.Client.Impl.InboundFrame.ReadFrom(Stream reader, Byte[] frameHeaderBuffer)
   at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()
   at RabbitMQ.Client.Framing.Impl.Connection.MainLoop()
   at RabbitMQ.Client.Impl.SessionBase.Transmit(OutgoingCommand& cmd)
   at RabbitMQ.Client.Framing.Impl.Model._Private_BasicPublish(String exchange, String routingKey, Boolean mandatory, IBasicProperties basicProperties, ReadOnlyMemory`1 body)
   at RabbitMQ.Client.Impl.ModelBase.BasicPublish(String exchange, String routingKey, Boolean mandatory, IBasicProperties basicProperties, ReadOnlyMemory`1 body)
   at Motor.Extensions.Hosting.RabbitMQ.RabbitMQMessagePublisher`1.PublishMessageAsync(MotorCloudEvent`1 motorCloudEvent, CancellationToken token)
   --- End of inner exception stack trace ---
   at Motor.Extensions.Hosting.RabbitMQ.RabbitMQMessagePublisher`1.PublishMessageAsync(MotorCloudEvent`1 motorCloudEvent, CancellationToken token)
   at Motor.Extensions.Hosting.Publisher.TypedMessagePublisher`2.PublishMessageAsync(MotorCloudEvent`1 motorCloudEvent, CancellationToken token)
   at Motor.Extensions.Hosting.MultiOutputServiceAdapter`2.HandleMessageAsync(MotorCloudEvent`1 dataCloudEvent, CancellationToken token)
   at Motor.Extensions.Hosting.MultiOutputServiceAdapter`2.HandleMessageAsync(MotorCloudEvent`1 dataCloudEvent, CancellationToken token)

cavus700 avatar Mar 31 '22 05:03 cavus700