Enriching error logs
I have an IConsumerInterceptor registered that takes some data from the message being processed, which is then used by a Serilog enricher.
However, if my consumer fails with an exception, for example, this error log is written https://github.com/zarusz/SlimMessageBus/blob/677cc30c8d46c06eb7977ac3c77539b598d81589/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs#L222.
I am using AsyncLocal to store my enrichment state from the message being processed and once the error above is logged, my enrichment state is lost therefore I must be outside the interceptor scope. I presume I'm here https://github.com/zarusz/SlimMessageBus/blob/677cc30c8d46c06eb7977ac3c77539b598d81589/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs#L183 but I have not dug too deeply in the inner SMB code yet.
Are there any quick ways you might be aware of to ensure my enrichment state can be included in error messages logged?
I presume a custom error handler might do the trick?
Thanks in advance!
I can achieve something like what I am after using the following:
public class SerilogTraceErrorHandler<TMessage>(
IOptions<TraceHeader> traceHeader,
ILogger<SerilogTraceErrorHandler<TMessage>> logger
) : ServiceBusConsumerErrorHandler<TMessage>
{
public override Task<ProcessResult> OnHandleError(
TMessage message,
IConsumerContext consumerContext,
Exception exception,
int attempts
)
{
var traceId = consumerContext.Headers.GetTraceId(traceHeader.Value.Name);
using (LogContext.PushProperty(TraceContextEnricher.PropertyName, traceId))
{
// This ensures any trace header value is included with the exception, but it
// does mean the exception is logged twice - once here and once within SMB code.
logger.LogError(exception, "Error processing message");
}
return Task.FromResult(Failure());
}
}
However, as the comment, mentions, I can only log the exception again so it appears twice.
Hey @tjpeel-ee,
A few thoughts and ideas:
-
Regarding the standard interceptor approach — since SMB uses
Microsoft.Extensions.Logging, you might consider usinglogger.BeginScope()within the interceptor (wrappingawait next()). It could be worth checking if Serilog is able to pick up that scope context for logging. -
On scope handling — the message scope is shared across the
IConsumer<>, its interceptors, and the error interceptor. Also, theIConsumerContextinstance remains the same across all of them, so you can access message headers or store custom data viaIConsumerContext.Propertiesas needed. Not entirely following where yourAsyncLocalto maintain state would be to advise on why it could be losing state (would need an example). -
Really nice use of
ConsumerErrorHandlerin your example — great fit for the scenario. Regarding the double logging, there would not be a way to suppress that inner log error. It might be useful to explore whether we can suppress the inner error logging when a custom error handler is present, giving full control to the user over how/when to log. Tagging @EtherZa here in case he has further thoughts on that. -
Also, there’s [this related discussion](https://github.com/zarusz/SlimMessageBus/issues/388#issuecomment-2802963247) that might be worth a glance. It may not apply to your case, but good context either way.
Hey @zarusz - thanks for the reply!
- Our logging capability once our system is deployed does not include whatever scopes might be active automatically, the log event is sanitised. We can of course do what I was doing and use a scope that is retained and then log within that scope so it's present on the log event but that is not the issue here.
- I have not added to the context properties, didn't know that was possible, I have just intercepted and set a trace ID. Conceptually it's as follows:
services.AddScoped(typeof(IConsumerInterceptor<>), typeof(TraceContextInterceptor<>));
public class TraceContextInterceptor<TMessage>(
IOptions<TraceHeader> traceHeader,
ITraceContextAccessor traceContextAccessor,
HeaderPropagationValues headerPropagationValues
) : IConsumerInterceptor<TMessage>
{
public async Task<object> OnHandle(TMessage message, Func<Task<object>> next, IConsumerContext context)
{
// Setting the trace context will take either the trace ID from the incoming
// message headers or it will start a new trace ID that may be propagated onwards
// to any nested HTTP calls or further message publishing
traceContextAccessor.Context = new TraceContext
{
TraceId = context.Headers.GetTraceId(traceHeader.Value.Name) ?? Guid.NewGuid().ToString("N"),
};
// As per the middleware implementation for header propagation, the following sets
// the headerPropagationValues.Headers value so it can be used by any configured
// HTTP handler
var headers = headerPropagationValues.Headers ??= new Dictionary<string, StringValues>(
StringComparer.OrdinalIgnoreCase
);
headers.Add(traceHeader.Value.Name, traceContextAccessor.Context.TraceId);
return await next();
}
}
/// <summary>
/// Pattern taken from IHttpContextAccessor implementation.
/// </summary>
public class TraceContextAccessor : ITraceContextAccessor
{
private static readonly AsyncLocal<ContextHolder> s_contextCurrent = new();
/// <inheritdoc/>
public ITraceContext? Context
{
get => s_contextCurrent.Value?.Context;
set
{
var holder = s_contextCurrent.Value;
if (holder != null)
holder.Context = null;
if (value != null)
s_contextCurrent.Value = new ContextHolder { Context = value };
}
}
private sealed class ContextHolder
{
public ITraceContext? Context;
}
}
public class TraceContextEnricher : ILogEventEnricher
{
public const string PropertyName = "CorrelationId";
private readonly ITraceContextAccessor _traceContextAccessor;
public TraceContextEnricher()
: this(new TraceContextAccessor()) { }
private TraceContextEnricher(ITraceContextAccessor traceContextAccessor)
{
_traceContextAccessor = traceContextAccessor;
}
/// <inheritdoc/>
public void Enrich(LogEvent logEvent, ILogEventPropertyFactory propertyFactory)
{
var requestContext = _traceContextAccessor.Context;
if (requestContext == null)
return;
logEvent.AddOrUpdateProperty(new LogEventProperty(PropertyName, new ScalarValue(requestContext.TraceId)));
}
}
- The real issue here is that there are logs in SMB that are made outside of the DI scope generated during message consumption. So that is where the
AsyncLocalapproach falls down. If these (primarily error logs) were to move within the DI scope then the context would be retained and my trace ID logic would work. Right now though, I have had to log again as you've seen and then SMB will also log. Being able to silence the SMB error log could be an approach but perhaps not the best. - I saw the context change PR but sadly that doesn't improve things. I can see why being able to use constructor injection of the context in a consumer is nicer than needing to grab it from a property but the new context accessor just backs off to the static MessageScope anyway, and even with the new changes it does not alter the logging behaviour I see as the DI scoping is the same.
Another option might be to provide a way to add logging scopes when a message is being handled but I don't know how that would be possible the way your code is structured today. Several logs appears outside of the message handling pipeline.
One thing I was concerned about though was needing to add an error handler just to log an error. I presume returning Failure() all the time from the error handler will not change any default Azure Service Bus SMB error handling? I did not see if there was a default error handler registered and if what I added caused a change in behaviour.
Hi @tjpeel-ee.
Have you considered using a DI scoped object for sharing context instead of leveraging AsyncLocal? If you are not doing so already, you would need to enable per message scoping via .PerMessageScopeEnabled(true) in the .AddSlimMessageBus() registration.
With regards to logging the exception with context; the ConsumerErrorHandler does run outside of the DI scope of the cosnumer, so you would not be able achieve what you are after there. You could create a custom IConsumerInterceptor<> to catch/log/enrich the exception within the same scope. You may also want to include IInterceptorWithOrder with an order of int.MinValue to ensure that the interceptor is on the outer most ring.
public sealed class ExceptionInterceptor<TMessage> : IConsumerInterceptor<TMessage>, IInterceptorWithOrder
{
private readonly ILogger _sharedContext;
private readonly SharedContext _sharedContext;
public ExceptionInterceptor(ILogger<ExceptionInterceptor<TMessage>> logger, SharedContext sharedContext)
{
_logger = logger;
_sharedContext = sharedContext;
}
public int Order => int.MinValue;
public async Task<object> OnHandle(TMessage message, Func<Task<object>> next, IConsumerContext context)
{
try
{
return await next();
}
catch (Exception ex)
{
// both the shared context and exception are available here
// nb. throw when done
throw;
}
}
}
To exclude unwanted error messags from your logs, you should be able to apply a category based filter to your logging configuration.
ie. appsettings.json (untestsed)
"Logging": {
"LogLevel": {
"Default": "Information",
"SlimMessageBus": "None"
}
}
or with Serilog (untested)
new LoggerConfiguration()
.Filter.ByExcluding(Matching.FromSource("SlimMessageBus"))
...
If that does not work, or is too broad in scope, you could also leverage Serilog.Expressions.
In my opinion, the SMB logs shouldn't be in the same scope as the message as they pertain to the how/what SMB is doing and not directly to the message (even if it is due an exception being thrown in an IConsumer instance).
The ConsumerErorrHandler is really intended to detemrine what steps to take on a failure.
Consider creating an instance that:
- Whitelists transient errors for an in process retry after applying some jitter. This is especially effective for a reattempt after a deadlock.
- Performs an out of process retry on exceptions that have occured due to an issue with the host state (
OutOfMemoryException) or an external service failure (SocketException). - Sends messages that throw unhandled exceptions (won't be resolved with a retry) directly to the DLQ for RCA (I include the stack trace in the deadletter reason for easier debugging - assess your security/privacy requirements first!).
Have you considered using OpenTelemetry (Otel) with System.Diagnostics.ActivitySource for contextual logging? Otel can also be used for distributed tracing with ASB. You would need to configure Otel with Serilog and Application Insights sinks, and add IConsumerInterceptor and IPublishInterceptor instances to create the context and add/read the distributed trace ids. This has the added benefit of including baggage (key/value pairs) that will be tagged against the workflow via the trace context. The keys/values are then searchable within App Insights to view the logs across the entire distributed workflow.
Sample to trace a reference number through a distributed workflow:
Activity.Current?.AddBaggage("ReferenceNo", "123");
Sample IConsumerInterceptor singleton (reads traceparent/baggage properties from incoming message and extends trace):
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using OpenTelemetry;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Trace;
using SlimMessageBus;
using SlimMessageBus.Host.Interceptor;
public class OpenTelemetryConsumerInterceptor<TMessage> : IConsumerInterceptor<TMessage>
{
public const string ConsumerTypeTag = "message.consumer.type";
public const string MessageTypeTag = "message.type";
private readonly ConcurrentDictionary<Type, string> _consumerName;
private readonly ActivitySource _activitySource;
public OpenTelemetryConsumerInterceptor(ActivitySource activitySource)
{
this._activitySource = activitySource;
this._consumerName = new ConcurrentDictionary<Type, string>();
}
public async Task<object> OnHandle(TMessage message, Func<Task<object>> next, IConsumerContext context)
{
static IEnumerable<string> ExtractTraceContext(IReadOnlyDictionary<string, object>? properties, string key)
{
return properties != null && properties.TryGetValue(key, out var rawValue) && rawValue is string value ? [value] : [];
}
var parentContext = Propagators.DefaultTextMapPropagator.Extract(default, context.Headers, ExtractTraceContext);
var hasParentTrace = parentContext.ActivityContext.IsValid();
var prevBaggage = Baggage.Current;
if (hasParentTrace)
{
Baggage.Current = parentContext.Baggage;
}
var tags = new Dictionary<string, object?>
{
{ ConsumerTypeTag, context.Consumer.GetType() },
{ MessageTypeTag, typeof(TMessage) }
};
using var activity = this._activitySource.StartActivity("MessageBus.Consume", ActivityKind.Consumer, hasParentTrace ? parentContext.ActivityContext : default, tags: tags);
try
{
var result = await next();
activity?.SetStatus(ActivityStatusCode.Ok);
return result;
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error);
activity?.AddException(ex);
throw;
}
finally
{
activity?.Stop();
if (hasParentTrace)
{
Baggage.Current = prevBaggage;
}
}
}
}
Accompanying IPublishInterceptor (adds traceparent/baggage properties to outgoing message):
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using OpenTelemetry;
using OpenTelemetry.Context.Propagation;
using SlimMessageBus;
using SlimMessageBus.Host.Interceptor;
public class OpenTelemetryPublisherInterceptor<TMessage> : IPublishInterceptor<TMessage>
{
private readonly ActivitySource _activitySource;
private readonly Type _messageType;
public OpenTelemetryPublisherInterceptor(ActivitySource activitySource)
{
this._activitySource = activitySource;
this._messageType = typeof(TMessage);
}
public async Task OnHandle(TMessage message, Func<Task> next, IProducerContext context)
{
var parentActivity = Activity.Current;
var parentContext = parentActivity?.Context ?? default;
PropagationContext? propagationContext = null;
using var activity = this._activitySource.StartActivity("MessageBus.Publish", ActivityKind.Producer, parentContext: parentContext);
if (activity != null)
{
activity.SetTag("message.type", this._messageType);
var baggage = Baggage.Current.SetBaggage(parentActivity?.Baggage);
propagationContext = new PropagationContext(activity.Context, baggage);
if (propagationContext != null)
{
Propagators.DefaultTextMapPropagator.Inject(propagationContext.Value, context.Headers, (properties, key, value) => properties[key] = value);
}
}
await next();
}
}
Metrics can be added in the same way:
using System;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Threading.Tasks;
using SlimMessageBus;
using SlimMessageBus.Host.Interceptor;
public class MetricConsumerInterceptor<TMessage> : IConsumerInterceptor<TMessage>
{
private readonly Counter<long> _count;
private readonly Counter<long> _errorCount;
private readonly Histogram<double> _duration;
public MetricConsumerInterceptor(Meter meter, string appName)
{
this._count = meter.CreateCounter<long>($"{appName}.svb.message.count");
this._errorCount = meter.CreateCounter<long>($"{appName}.svb.message.error");
this._duration = meter.CreateHistogram<double>($"{appName}.svb.message.duration", "s", "Measures the duration of the event");
}
public async Task<object> OnHandle(TMessage message, Func<Task<object>> next, IConsumerContext context)
{
var tags = new TagList
{
{ "message-type", typeof(TMessage).Name }
};
var stopwatch = Stopwatch.StartNew();
try
{
this._count.Add(1, tags);
return await next();
}
catch
{
this._errorCount.Add(1, tags);
throw;
}
finally
{
stopwatch.Stop();
this._duration.Record(stopwatch.Elapsed.TotalSeconds, tags);
}
}
}
Hi @EtherZa,
Thanks for the input and for sharing your examples! Regarding your OTEL interceptor setup — I’ve actually been considering building an OTEL plugin that would do exactly what you demonstrated. Might be worth submitting a PR for it 🙂 It could potentially resolve #333.
Hi @tjpeel-ee,
I believe the issue you’re seeing with TraceContextAccessor and AsyncLocal<T> losing scope between the interceptor and the error handler stems from how SMB processes messages. In SlimMessageBus, the message handler and error handler are executed as separate, sibling async flows (not nested), which causes the async context (and thus AsyncLocal) to be lost between them. See this line for reference.
Some options mentioned before that may help:
- Using a DI-scoped object (suggested by @EtherZa)
- Avoiding double logging by silencing either
SlimMessageBus.Host.AzureServiceBus.Consumer.AsbQueueConsumerorSlimMessageBus.Host.AzureServiceBus.Consumer.AsbTopicSubscriptionConsumer - Leveraging
IConsumerContext.Propertiesto persist state across the consumer/interceptor/error handler and reconstruct yourTraceContextEnricher
One thing I was concerned about though was needing to add an error handler just to log an error. I presume returning Failure() all the time from the error handler will not change any default Azure Service Bus SMB error handling? I did not see if there was a default error handler registered and if what I added caused a change in behaviour.
Good question. Adding a custom error handler currently doesn’t significantly change what's logged by the ASB consumer. I’ve been considering whether we should suppress the default SMB error logging when a custom error handler is in place. One idea is to add a flag to the error handler result (initially for ASB, potentially others later) that gives more control over the logging behavior.
Still debating whether that's the right path — would love to hear your thoughts on it, @EtherZa.
Thanks for the input and for sharing your examples! Regarding your OTEL interceptor setup — I’ve actually been considering building an OTEL plugin that would do exactly what you demonstrated. Might be worth submitting a PR for it 🙂 It could potentially resolve #333.
No worries. I haven't submitted it as a plugin before as the OTEL registration is a significant part of the setup but very much out of the scope of SMB, but please feel free to scrape the code where/if appropriate.
Perhaps there is room for a "recipes" repo to sit alongside SMB? There are some other patterns that could be useful like ETE test using reqnroll (BDD) or .net aspire integration, etc
Good question. Adding a custom error handler currently doesn’t significantly change what's logged by the ASB consumer. I’ve been considering whether we should suppress the default SMB error logging when a custom error handler is in place. One idea is to add a flag to the error handler result (initially for ASB, potentially others later) that gives more control over the logging behavior.
Still debating whether that's the right path — would love to hear your thoughts on it, @EtherZa.
IMO that would be blurring the lines between SMB internals and conumer implementation. If you consider that SMB is a framework, the logs should exist to provide an audit history of why an execution path was taken to aid in debugging of SMB ie. an exception occurred in the consumer, therefore... These should be separate from the consumer implementation.
A CustomErrorHandler does not perform any logging itself (even if the consuemr decides to do so), but the fact that execution was deferred to a CustomErrorHandler or what the response from the handler was, could be useful in the context of SMB logging (debug or trace).
All SMB logs are categorised as being SMB logs via the namespace ie. ILogger<SlimMessageBus...> and can be filtered if not required. I suspect that most consumers of the library would only be interested in the course messages ie. "Topology created", "consumer disabled", and perhaps "message XXX recevied". From my perspective, I want to know why SMB is not running (catastrophic failure), and what happened in my workflow (independant of SMB); but mostly what is happening with "my" code.
Can we close this issue @tjpeel-ee?
Apologies @zarusz, yeah, this can now be closed. I will do it in fact. I meant to circle back on this and thank you for the insightful contributions. They very much helped my understanding.