arcus.messaging icon indicating copy to clipboard operation
arcus.messaging copied to clipboard

fix: use message receiver io processor to correctly stop message pump during circuit breaker

Open stijnmoreels opened this issue 1 year ago • 1 comments

The ServiceBusProcessor was not able to be stopped within its message processing - this PR uses the ServiceBusReceiver fully with a more recent Service Bus Messaging package.

stijnmoreels avatar Jun 27 '24 07:06 stijnmoreels

Deploy Preview for arcus-messaging canceled.

Name Link
Latest commit d5cfabf7798c51101c6dd1bfdcd44a7db74510dc
Latest deploy log https://app.netlify.com/sites/arcus-messaging/deploys/675a78670f756d0008e38562

netlify[bot] avatar Jun 27 '24 07:06 netlify[bot]

I'm testing this a bit further and I'm getting good results; however, I now run into an issue where an exception was thrown for some reason (no idea what happened). The exception was thrown, and after that exception, some other traces were logged saying that the received message was 'null'. And afterwards, no new messages were read from ServiceBus by the MessagePump.

[23:02:26 ERR] Azure Service Bus Queue message pump 'a885a71c-74e8-4325-a681-e9fccbf1a9f6' on entity path 'orders' in namespace 'fg-sb-arcusdemo.servicebus.windows.net' failed to process single message during half-open circuit, retrying after circuit delay
Azure.Messaging.ServiceBus.ServiceBusException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. For more information please see https://aka.ms/ServiceBusExceptions . Reference:4cc34e78-a2da-47ab-9963-c4e1e30659e8, TrackingId:f6a8ee520000004d01ba96c6672be710_G7_B18, SystemTracker:gi::G7:720044332:amqps://fg-sb-arcusdemo.servicebus.windows.net/-e1a11b51;0:5:6:source(address:/orders,filter:[]), bi::in-connection50454(G7-38509)::session50457::link29005510, Timestamp:2024-11-06T22:02:26 (MessageLockLost). For troubleshooting information, see https://aka.ms/azsdk/net/servicebus/exceptions/troubleshoot.
   at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.DisposeMessageAsync(Guid lockToken, Outcome outcome, DispositionStatus disposition, TimeSpan timeout, IDictionary`2 propertiesToModify, String deadLetterReason, String deadLetterDescription)
   at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.CompleteInternalAsync(Guid lockToken, TimeSpan timeout)
   at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.<>c.<<CompleteAsync>b__47_0>d.MoveNext()
--- End of stack trace from previous location ---
   at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.<>c__22`1.<<RunOperation>b__22_0>d.MoveNext()
--- End of stack trace from previous location ---
   at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose)
   at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose)
   at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.CompleteAsync(Guid lockToken, CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage message, CancellationToken cancellationToken)
   at Arcus.Messaging.Pumps.ServiceBus.AzureServiceBusMessagePump.ProcessMessageAsync(ServiceBusReceivedMessage message, CancellationToken cancellationToken)
   at Arcus.Messaging.Pumps.ServiceBus.AzureServiceBusMessagePump.TryProcessProcessSingleMessageAsync(MessagePumpCircuitBreakerOptions options)

After this exception, I can find these traces in the logs:

[23:33:20 WRN] Received message on Azure Service Bus Queue message pump '079f0fc1-5eab-487c-81c1-20e0f31b3f8b' was null, skipping
[23:33:25 WRN] Received message on Azure Service Bus Queue message pump '079f0fc1-5eab-487c-81c1-20e0f31b3f8b' was null, skipping
[23:33:26 WRN] Received message on Azure Service Bus Queue message pump '079f0fc1-5eab-487c-81c1-20e0f31b3f8b' was null, skipping
[23:33:26 WRN] Received message on Azure Service Bus Queue message pump '079f0fc1-5eab-487c-81c1-20e0f31b3f8b' was null, skipping

When I inspect the ServiceBus queue that I'm consuming, there are no messages left in the queue. When I add a few more messages, those messages are not being retrieved anymore. I have to manually restart my application before the MessagePump starts reading messages again.

And I can like keep on reproducing that; once the last message has been processed, this exception saying that the messge lock is expired is thrown.

On another attempt, when all messages were processed and new messages were sent to the queue, the MessagePump retrieved one message.
The first message was retrieved from the queue and was processed; processing went fine and an attempt was made to resume normal processing, however, nothing happened:

these are my logs:

[23:37:27 INF] Process Order eabb888e-afde-49ed-8ac9-f389f8e19fe9
[23:37:27 INF] Start processing HTTP request GET http://localhost:4068/api/product/2
[23:37:27 INF] Sending HTTP request GET http://localhost:4068/api/product/2
[23:37:27 INF] Received HTTP response headers after 19.6146ms - 200
[23:37:27 INF] End processing HTTP request after 37.297ms - 200
[23:37:27 INF] Ordering 9 items of Product B
[23:37:27 WRN] Attempting to resume message-pump
[23:37:27 WRN] {"EventName": "MessagePump.Resumed", "Context": {"TelemetryType": "Events"}, "$type": "EventLogEntry"}
[23:37:27 WRN] {"RequestMethod": "<not-applicable>", "RequestHost": "<not-applicable>", "RequestUri": "<not-applicable>", "ResponseStatusCode": 200, "RequestDuration": "00:00:00.0732313", "RequestTime": "2024-11-06T22:37:27.7335884 +00:00", "SourceSystem": "AzureServiceBus", "CustomRequestSource": null, "OperationName": "Process", "Context": {"ServiceBus-Endpoint": "fg-sb-arcusdemo.servicebus.windows.net", "ServiceBus-Entity": "orders", "ServiceBus-EntityType": "Queue", "TelemetryType": "Request"}, "$type": "RequestLogEntry"}

But afterwards, nothing happened and the messages that were in the queue , remained there. They were not being retrieved. (The 'Attempting to result message-pump' trace and the custom event 'MessagePump.Resumed' are custom traces that I'm writing via the code below)

_logger.LogWarning("Attempting to resume message-pump");
await _messagePumpCircuitBreaker.ResumeMessageProcessingAsync(messagePumpJobId);
_logger.LogCustomEvent("MessagePump.Resumed");

fgheysels avatar Nov 06 '24 22:11 fgheysels