arcus.messaging
arcus.messaging copied to clipboard
fix: use message receiver io processor to correctly stop message pump during circuit breaker
The ServiceBusProcessor was not able to be stopped within its message processing - this PR uses the ServiceBusReceiver fully with a more recent Service Bus Messaging package.
Deploy Preview for arcus-messaging canceled.
| Name | Link |
|---|---|
| Latest commit | d5cfabf7798c51101c6dd1bfdcd44a7db74510dc |
| Latest deploy log | https://app.netlify.com/sites/arcus-messaging/deploys/675a78670f756d0008e38562 |
I'm testing this a bit further and I'm getting good results; however, I now run into an issue where an exception was thrown for some reason (no idea what happened). The exception was thrown, and after that exception, some other traces were logged saying that the received message was 'null'. And afterwards, no new messages were read from ServiceBus by the MessagePump.
[23:02:26 ERR] Azure Service Bus Queue message pump 'a885a71c-74e8-4325-a681-e9fccbf1a9f6' on entity path 'orders' in namespace 'fg-sb-arcusdemo.servicebus.windows.net' failed to process single message during half-open circuit, retrying after circuit delay
Azure.Messaging.ServiceBus.ServiceBusException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. For more information please see https://aka.ms/ServiceBusExceptions . Reference:4cc34e78-a2da-47ab-9963-c4e1e30659e8, TrackingId:f6a8ee520000004d01ba96c6672be710_G7_B18, SystemTracker:gi::G7:720044332:amqps://fg-sb-arcusdemo.servicebus.windows.net/-e1a11b51;0:5:6:source(address:/orders,filter:[]), bi::in-connection50454(G7-38509)::session50457::link29005510, Timestamp:2024-11-06T22:02:26 (MessageLockLost). For troubleshooting information, see https://aka.ms/azsdk/net/servicebus/exceptions/troubleshoot.
at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.DisposeMessageAsync(Guid lockToken, Outcome outcome, DispositionStatus disposition, TimeSpan timeout, IDictionary`2 propertiesToModify, String deadLetterReason, String deadLetterDescription)
at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.CompleteInternalAsync(Guid lockToken, TimeSpan timeout)
at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.<>c.<<CompleteAsync>b__47_0>d.MoveNext()
--- End of stack trace from previous location ---
at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.<>c__22`1.<<RunOperation>b__22_0>d.MoveNext()
--- End of stack trace from previous location ---
at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose)
at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose)
at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken)
at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.CompleteAsync(Guid lockToken, CancellationToken cancellationToken)
at Azure.Messaging.ServiceBus.ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage message, CancellationToken cancellationToken)
at Arcus.Messaging.Pumps.ServiceBus.AzureServiceBusMessagePump.ProcessMessageAsync(ServiceBusReceivedMessage message, CancellationToken cancellationToken)
at Arcus.Messaging.Pumps.ServiceBus.AzureServiceBusMessagePump.TryProcessProcessSingleMessageAsync(MessagePumpCircuitBreakerOptions options)
After this exception, I can find these traces in the logs:
[23:33:20 WRN] Received message on Azure Service Bus Queue message pump '079f0fc1-5eab-487c-81c1-20e0f31b3f8b' was null, skipping
[23:33:25 WRN] Received message on Azure Service Bus Queue message pump '079f0fc1-5eab-487c-81c1-20e0f31b3f8b' was null, skipping
[23:33:26 WRN] Received message on Azure Service Bus Queue message pump '079f0fc1-5eab-487c-81c1-20e0f31b3f8b' was null, skipping
[23:33:26 WRN] Received message on Azure Service Bus Queue message pump '079f0fc1-5eab-487c-81c1-20e0f31b3f8b' was null, skipping
When I inspect the ServiceBus queue that I'm consuming, there are no messages left in the queue. When I add a few more messages, those messages are not being retrieved anymore. I have to manually restart my application before the MessagePump starts reading messages again.
And I can like keep on reproducing that; once the last message has been processed, this exception saying that the messge lock is expired is thrown.
On another attempt, when all messages were processed and new messages were sent to the queue, the MessagePump retrieved one message.
The first message was retrieved from the queue and was processed; processing went fine and an attempt was made to resume normal processing, however, nothing happened:
these are my logs:
[23:37:27 INF] Process Order eabb888e-afde-49ed-8ac9-f389f8e19fe9
[23:37:27 INF] Start processing HTTP request GET http://localhost:4068/api/product/2
[23:37:27 INF] Sending HTTP request GET http://localhost:4068/api/product/2
[23:37:27 INF] Received HTTP response headers after 19.6146ms - 200
[23:37:27 INF] End processing HTTP request after 37.297ms - 200
[23:37:27 INF] Ordering 9 items of Product B
[23:37:27 WRN] Attempting to resume message-pump
[23:37:27 WRN] {"EventName": "MessagePump.Resumed", "Context": {"TelemetryType": "Events"}, "$type": "EventLogEntry"}
[23:37:27 WRN] {"RequestMethod": "<not-applicable>", "RequestHost": "<not-applicable>", "RequestUri": "<not-applicable>", "ResponseStatusCode": 200, "RequestDuration": "00:00:00.0732313", "RequestTime": "2024-11-06T22:37:27.7335884 +00:00", "SourceSystem": "AzureServiceBus", "CustomRequestSource": null, "OperationName": "Process", "Context": {"ServiceBus-Endpoint": "fg-sb-arcusdemo.servicebus.windows.net", "ServiceBus-Entity": "orders", "ServiceBus-EntityType": "Queue", "TelemetryType": "Request"}, "$type": "RequestLogEntry"}
But afterwards, nothing happened and the messages that were in the queue , remained there. They were not being retrieved. (The 'Attempting to result message-pump' trace and the custom event 'MessagePump.Resumed' are custom traces that I'm writing via the code below)
_logger.LogWarning("Attempting to resume message-pump");
await _messagePumpCircuitBreaker.ResumeMessageProcessingAsync(messagePumpJobId);
_logger.LogCustomEvent("MessagePump.Resumed");