NServiceBus.AmazonSQS
NServiceBus.AmazonSQS copied to clipboard
Improve log message when Amazon.SQS.Model.QueueDoesNotExistException
Hi,
Today in my non-prod env I'm facing Amazon.SQS.Model.QueueDoesNotExistException and I don't know which queue NServiceBus is trying to access
The call stack:
Amazon.SQS.Model.QueueDoesNotExistException: The specified queue does not exist for this wsdl version.
---> Amazon.Runtime.Internal.HttpErrorResponseException: Exception of type 'Amazon.Runtime.Internal.HttpErrorResponseException' was thrown.
at Amazon.Runtime.HttpWebRequestMessage.GetResponseAsync(CancellationToken cancellationToken)
at Amazon.Runtime.Internal.HttpHandler`1.InvokeAsync[T](IExecutionContext executionContext)\
at Amazon.Runtime.Internal.Unmarshaller.InvokeAsync[T](IExecutionContext executionContext)
at Amazon.SQS.Internal.ValidationResponseHandler.InvokeAsync[T](IExecutionContext executionContext)
at Amazon.Runtime.Internal.ErrorHandler.InvokeAsync[T](IExecutionContext executionContext)
--- End of inner exception stack trace ---
at Amazon.Runtime.Internal.HttpErrorResponseExceptionHandler.HandleExceptionStream(IRequestContext requestContext, IWebResponseData httpErrorResponse, HttpErrorResponseException exception, Stream responseStream)
at Amazon.Runtime.Internal.HttpErrorResponseExceptionHandler.HandleExceptionAsync(IExecutionContext executionContext, HttpErrorResponseException exception)
at Amazon.Runtime.Internal.ExceptionHandler`1.HandleAsync(IExecutionContext executionContext, Exception exception)
at Amazon.Runtime.Internal.ErrorHandler.ProcessExceptionAsync(IExecutionContext executionContext, Exception exception)
at Amazon.Runtime.Internal.ErrorHandler.InvokeAsync[T](IExecutionContext executionContext)
at Amazon.Runtime.Internal.CallbackHandler.InvokeAsync[T](IExecutionContext executionContext)
at Amazon.Runtime.Internal.EndpointDiscoveryHandler.InvokeAsync[T](IExecutionContext executionContext)
at Amazon.Runtime.Internal.EndpointDiscoveryHandler.InvokeAsync[T](IExecutionContext executionContext)
at Amazon.Runtime.Internal.CredentialsRetriever.InvokeAsync[T](IExecutionContext executionContext)
at Amazon.Runtime.Internal.RetryHandler.InvokeAsync[T](IExecutionContext executionContext)
at Amazon.Runtime.Internal.RetryHandler.InvokeAsync[T](IExecutionContext executionContext)
at Amazon.Runtime.Internal.CallbackHandler.InvokeAsync[T](IExecutionContext executionContext)
at Amazon.Runtime.Internal.CallbackHandler.InvokeAsync[T](IExecutionContext executionContext)
at Amazon.Runtime.Internal.ErrorCallbackHandler.InvokeAsync[T](IExecutionContext executionContext)
at Amazon.Runtime.Internal.MetricsHandler.InvokeAsync[T](IExecutionContext executionContext)
at NServiceBus.Transport.SQS.QueueCache.GetQueueUrl(String queueName) in /_/src/NServiceBus.Transport.SQS/QueueCache.cs:line 54
at NServiceBus.Transport.SQS.MessageDispatcher.ApplyUnicastOperationMappingIfNecessary(UnicastTransportOperation transportOperation, SqsPreparedMessage sqsPreparedMessage, Int64 delaySeconds, String messageId, Dictionary`2 nativeMessageAttributes) in /_/src/NServiceBus.Transport.SQS/MessageDispatcher.cs:line 374
at NServiceBus.Transport.SQS.MessageDispatcher.PrepareMessage[TMessage](IOutgoingTransportOperation transportOperation, HashSet`1 messageIdsOfMulticastedEvents, TransportTransaction transportTransaction) in /_/src/NServiceBus.Transport.SQS/MessageDispatcher.cs:line 322
at NServiceBus.Transport.SQS.MessageDispatcher.DispatchBatched(IEnumerable`1 toBeBatchedTransportOperations, HashSet`1 messageIdsOfMulticastEvents, TransportTransaction transportTransaction) in /_/src/NServiceBus.Transport.SQS/MessageDispatcher.cs:line 104
at NServiceBus.Transport.SQS.MessageDispatcher.Dispatch(TransportOperations outgoingMessages, TransportTransaction transaction, ContextBag context) in /_/src/NServiceBus.Transport.SQS/MessageDispatcher.cs:line 59
at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next) in /_/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs:line 61
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext) in /_/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs:line 50
at NServiceBus.Transport.SQS.InputQueuePump.ProcessMessageWithInMemoryRetries(Dictionary`2 headers, String nativeMessageId, Byte[] body, Message nativeMessage, CancellationToken token) in /_/src/NServiceBus.Transport.SQS/InputQueuePump.cs:line 273
Maybe it's related with https://github.com/Particular/NServiceBus.AmazonSQS/issues/227
NServiceBus: 7.4.6 NServiceBus.AmazonSQS: 5.4.0 .NET 5.0.403 on Container with Debian
Thanks for the feedback @lillo42 We will look at improving this in a future release.
In the meantime, as this is a non-prod environment, can you enable installers for the endpoint and have it create the queues for you?
@mikeminutillo It's already enabled, for some reason it's resolving a queue that not exist/created
@lillo42 if you're still trying to figure out which queue is causing the problem we may be able to at least log all operations that are going out. Try adding this code to your endpont in your development environment:
using System;
using System.Linq;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.Features;
using NServiceBus.Logging;
using NServiceBus.Pipeline;
using NServiceBus.Routing;
class Spy : Feature
{
public Spy()
{
EnableByDefault();
}
protected override void Setup(FeatureConfigurationContext context)
{
context.Pipeline.Register(new InterceptionSpy.Registration());
}
class InterceptionSpy : Behavior<IIncomingPhysicalMessageContext>
{
public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
{
var pendingTransportOperations = context.Extensions.Get<PendingTransportOperations>();
await next().ConfigureAwait(false);
var addresses = (from operation in pendingTransportOperations.Operations
where operation.AddressTag is UnicastAddressTag
let unicastAddressTag = (UnicastAddressTag) operation.AddressTag
select $"\t- {unicastAddressTag.Destination}").Distinct().ToArray();
if (addresses.Any())
{
Logger.Info($"Detected operations sending to: {Environment.NewLine}{string.Join(Environment.NewLine, addresses)}");
}
}
public class Registration : RegisterStep
{
public Registration() : base("InterceptionSpy", typeof(InterceptionSpy), "Logs where we're about to send messages")
{
InsertBeforeIfExists("AuditProcessedMessage");
}
}
private static ILog Logger = LogManager.GetLogger<InterceptionSpy>();
}
}
Just adding this code to your project will install a new behavior in the receive pipeline. This behavior will look for pending transport operations that are about to be stored in the outbox. This is what is going to be dispatched to the transport. It will add a log message like this to the logs:
Detected operations sending to:
- Address1
- Address2
When there is a failure, in the logs you should be able to look at the previous spy log message and see which logical queues the endpoint is trying to send to. Hopefully that will help narrow things down. Ideally I'd like to have this be able to call into QueueCache and see if we can't replicate the exception but unfortunately that is not-public.
@lillo42 were you able to figure out which queue is causing the issue?
@mikeminutillo yes, I've figure out, it's seems to be a queue of other env, I've fixed it by remove the EnableMessageDrivenPubSubCompatibilityMode and It's start to work
Ah, OK. That makes sense if there was a queue in the message-driven pub-sub subscription storage. We would assume that the subscriber is responsible for that queue, so we don't create it for you.
@lillo42 This will be fixed as part of 5.6.2 which should go out in the next few days.