NServiceBus
NServiceBus copied to clipboard
Provide configuration for header propagation
Occasionally we see a need to copy headers from an incoming message to an outgoing message. The framework does this for a few headers already. i.e. NServiceBus.ConversationId is copied, NServiceBus.MessageId is copied into NServiceBus.RelatedTo. To add more, a custom behavior is required.
What if, instead, there was a public API to configure this type of header propagation:
var headerPropagation = endpointConfiguration.EnableHeaderPropagation();
// Copies the CustomerId header from incoming message to outgoing message
headerPropagation.CopyHeader("CustomerId");
// Copies the value from the OrderId header on the incoming message to the TriggeringOrderId header on the outgoing message
headerPropagation.CopyHeader("OrderId", "TriggeringOrderId");
This would allow end users to make certain headers "sticky", carrying context through a conversation without having to override the framework's conversation id.
our header propagation uses a propagate. prefix to indicate something that should be propagated. we also have many NSB helpers for claims
using NServiceBus.Extensibility;
using NServiceBus.Faults;
using NServiceBus.Pipeline;
namespace Utilities.NServiceBus;
public static partial class HeaderPropagationExtensions
{
public static void SetPropagateHeader(IDictionary<string, string> headers, string key, string value)
{
Guard.AgainstEmpty(key, nameof(key));
headers[$"propagate.{key}"] = value;
}
public static void SetPropagateHeader(this ExtendableOptions options, string key, string value)
{
Guard.AgainstEmpty(key, nameof(key));
options.SetHeader($"propagate.{key}", value);
}
public static IReadOnlyDictionary<string, string> PropagationHeaders(this IIncomingPhysicalMessageContext context)
=> PropagationHeaders(context.MessageHeaders);
public static IReadOnlyDictionary<string, string> PropagationHeaders(this MutateIncomingTransportMessageContext context)
=> PropagationHeaders(context.Headers);
public static IReadOnlyDictionary<string, string> PropagationHeaders(this IOutgoingPhysicalMessageContext context)
=> PropagationHeaders(context.Headers);
public static IReadOnlyDictionary<string, string> PropagationHeaders(this MutateOutgoingTransportMessageContext context)
=> PropagationHeaders(context.OutgoingHeaders);
public static IReadOnlyDictionary<string, string> PropagationHeaders(this FailedMessage failedMessage)
=> PropagationHeaders(failedMessage.Headers);
public static IReadOnlyDictionary<string, string> PropagationHeaders(this IMessageHandlerContext context)
=> PropagationHeaders(context.MessageHeaders);
public static IReadOnlyDictionary<string, string> PropagationHeaders(IReadOnlyDictionary<string, string> headers) =>
headers
.Where(x => x.IsPropagationHeader())
.ToDictionary(
x => x.Key.Substring(10, x.Key.Length - 10),
x => x.Value);
internal static bool IsPropagationHeader(this KeyValuePair<string, string> header)
=> header.Key.StartsWith("propagate.", StringComparison.OrdinalIgnoreCase);
}
using System.Security.Claims;
using Newtonsoft.Json;
using NServiceBus.Faults;
using NServiceBus.Pipeline;
namespace Utilities.NServiceBus;
public static partial class HeaderPropagationExtensions
{
public static IEnumerable<Claim> Claims(this IIncomingPhysicalMessageContext context)
=> Claims(context.MessageHeaders);
public static IEnumerable<Claim> Claims(this MutateIncomingTransportMessageContext context)
=> Claims(context.Headers);
public static IEnumerable<Claim> Claims(this IOutgoingPhysicalMessageContext context)
=> Claims(context.Headers);
public static IEnumerable<Claim> Claims(this MutateOutgoingTransportMessageContext context)
=> Claims(context.OutgoingHeaders);
public static IEnumerable<Claim> Claims(this FailedMessage message)
=> Claims(message.Headers);
public static ClaimsPrincipal ClaimsAsPrincipal(this FailedMessage message) =>
ClaimsAsPrincipal(message.Headers);
#region ClaimsAsPrincipal
public static ClaimsPrincipal ClaimsAsPrincipal(this IMessageHandlerContext context) =>
ClaimsAsPrincipal(context.MessageHeaders);
public static ClaimsPrincipal ClaimsAsPrincipal(this IReadOnlyDictionary<string, string> headers)
{
var claimsIdentity = new ClaimsIdentity(Claims(headers));
var claimsPrincipal = new ClaimsPrincipal();
claimsPrincipal.AddIdentity(claimsIdentity);
return claimsPrincipal;
}
public static Guid UserId(this IMessageHandlerContext context) =>
Guid.Parse(GetClaim(context.MessageHeaders, ClaimTypes.UserData));
public static Guid RoleId(this IMessageHandlerContext context) =>
Guid.Parse(GetClaim(context.MessageHeaders, ClaimTypes.Role));
public static string Email(this IMessageHandlerContext context) =>
GetClaim(context.MessageHeaders, ClaimTypes.Email);
static string GetClaim(IReadOnlyDictionary<string, string> headers, string claimSuffix)
{
var key = "propagate.claim." + claimSuffix;
if (!headers.TryGetValue(key, out var value))
{
throw new($"Claim not found: {key}");
}
var values = JsonConvert.DeserializeObject<List<string>>(value)!;
return values.Single();
}
#endregion
public static IEnumerable<Claim> Claims(this IMessageHandlerContext context)
=> Claims(context.MessageHeaders);
#region AddClaims
public static void AddClaims(this SendOptions options, List<Claim> claims)
{
var claimsHeaders = new Dictionary<string, string>();
AppendClaims(claims, claimsHeaders);
foreach (var claimHeader in claimsHeaders)
{
options.SetHeader(claimHeader.Key, claimHeader.Value);
}
}
#endregion
public static void AppendClaim(Claim claim, IDictionary<string, string> headers)
=> AppendClaims(
new[]
{
claim
},
headers);
#region AppendClaims
public static void AppendClaims(IEnumerable<Claim> claims, IDictionary<string, string> headers)
{
const string prefix = "propagate.claim.";
foreach (var claim in claims.GroupBy(x => x.Type))
{
var items = claim.Select(x => x.Value);
headers.Add(prefix + claim.Key, JsonConvert.SerializeObject(items));
}
}
#endregion
public static IEnumerable<Claim> Claims(IReadOnlyDictionary<string, string> headers)
{
const string? prefix = "claim.";
foreach (var header in PropagationHeaders(headers))
{
var key = header.Key;
if (!key.StartsWith(prefix))
{
continue;
}
key = key.Substring(prefix.Length, key.Length - prefix.Length);
var list = JsonConvert.DeserializeObject<List<string>>(header.Value)!;
foreach (var value in list)
{
yield return new(key, value);
}
}
}
public static string UpnClaim(this IEnumerable<Claim> claims)
{
var claim = claims.SingleOrDefault(x => string.Equals(x.Type, ClaimTypes.Upn, StringComparison.OrdinalIgnoreCase));
if (claim != null)
{
return claim.Value;
}
throw new("No upn claim found.");
}
}
namespace Utilities.NServiceBus;
/// <summary>
/// Mutator that finds the 'propagate.*' headers and adds them to the outgoing message's headers.
/// This makes it available for downstream handlers.
/// </summary>
public class PropagateHeadersMutator : IMutateOutgoingMessages
{
public Task MutateOutgoing(MutateOutgoingMessageContext context)
{
var outgoing = context.OutgoingHeaders;
outgoing.Add("NServiceBus.OriginatingVersion", AssemblyInfo.Version.ToString());
outgoing.Add("NServiceBus.OriginatingSha", AssemblyInfo.Sha);
if (!context.TryGetIncomingHeaders(out var incomingHeaders))
{
return Task.CompletedTask;
}
foreach (var header in incomingHeaders)
{
if (header.IsPropagationHeader())
{
outgoing.Add(header.Key, header.Value);
}
}
return Task.CompletedTask;
}
}
would love to see something similar OOTB in NSB.
let me know if u want the tests