NServiceBus icon indicating copy to clipboard operation
NServiceBus copied to clipboard

Provide configuration for header propagation

Open mikeminutillo opened this issue 3 years ago • 1 comments
trafficstars

Occasionally we see a need to copy headers from an incoming message to an outgoing message. The framework does this for a few headers already. i.e. NServiceBus.ConversationId is copied, NServiceBus.MessageId is copied into NServiceBus.RelatedTo. To add more, a custom behavior is required.

What if, instead, there was a public API to configure this type of header propagation:

var headerPropagation = endpointConfiguration.EnableHeaderPropagation();

// Copies the CustomerId header from incoming message to outgoing message
headerPropagation.CopyHeader("CustomerId");

// Copies the value from the OrderId header on the incoming message to the TriggeringOrderId header on the outgoing message
headerPropagation.CopyHeader("OrderId", "TriggeringOrderId");

This would allow end users to make certain headers "sticky", carrying context through a conversation without having to override the framework's conversation id.

mikeminutillo avatar May 04 '22 08:05 mikeminutillo

our header propagation uses a propagate. prefix to indicate something that should be propagated. we also have many NSB helpers for claims

using NServiceBus.Extensibility;
using NServiceBus.Faults;
using NServiceBus.Pipeline;

namespace Utilities.NServiceBus;

public static partial class HeaderPropagationExtensions
{
    public static void SetPropagateHeader(IDictionary<string, string> headers, string key, string value)
    {
        Guard.AgainstEmpty(key, nameof(key));
        headers[$"propagate.{key}"] = value;
    }

    public static void SetPropagateHeader(this ExtendableOptions options, string key, string value)
    {
        Guard.AgainstEmpty(key, nameof(key));
        options.SetHeader($"propagate.{key}", value);
    }

    public static IReadOnlyDictionary<string, string> PropagationHeaders(this IIncomingPhysicalMessageContext context)
        => PropagationHeaders(context.MessageHeaders);

    public static IReadOnlyDictionary<string, string> PropagationHeaders(this MutateIncomingTransportMessageContext context)
        => PropagationHeaders(context.Headers);

    public static IReadOnlyDictionary<string, string> PropagationHeaders(this IOutgoingPhysicalMessageContext context)
        => PropagationHeaders(context.Headers);

    public static IReadOnlyDictionary<string, string> PropagationHeaders(this MutateOutgoingTransportMessageContext context)
        => PropagationHeaders(context.OutgoingHeaders);

    public static IReadOnlyDictionary<string, string> PropagationHeaders(this FailedMessage failedMessage)
        => PropagationHeaders(failedMessage.Headers);

    public static IReadOnlyDictionary<string, string> PropagationHeaders(this IMessageHandlerContext context)
        => PropagationHeaders(context.MessageHeaders);

    public static IReadOnlyDictionary<string, string> PropagationHeaders(IReadOnlyDictionary<string, string> headers) =>
        headers
            .Where(x => x.IsPropagationHeader())
            .ToDictionary(
                x => x.Key.Substring(10, x.Key.Length - 10),
                x => x.Value);

    internal static bool IsPropagationHeader(this KeyValuePair<string, string> header)
        => header.Key.StartsWith("propagate.", StringComparison.OrdinalIgnoreCase);
}
using System.Security.Claims;
using Newtonsoft.Json;
using NServiceBus.Faults;
using NServiceBus.Pipeline;

namespace Utilities.NServiceBus;

public static partial class HeaderPropagationExtensions
{
    public static IEnumerable<Claim> Claims(this IIncomingPhysicalMessageContext context)
        => Claims(context.MessageHeaders);

    public static IEnumerable<Claim> Claims(this MutateIncomingTransportMessageContext context)
        => Claims(context.Headers);

    public static IEnumerable<Claim> Claims(this IOutgoingPhysicalMessageContext context)
        => Claims(context.Headers);

    public static IEnumerable<Claim> Claims(this MutateOutgoingTransportMessageContext context)
        => Claims(context.OutgoingHeaders);

    public static IEnumerable<Claim> Claims(this FailedMessage message)
        => Claims(message.Headers);

    public static ClaimsPrincipal ClaimsAsPrincipal(this FailedMessage message) =>
        ClaimsAsPrincipal(message.Headers);

    #region ClaimsAsPrincipal

    public static ClaimsPrincipal ClaimsAsPrincipal(this IMessageHandlerContext context) =>
        ClaimsAsPrincipal(context.MessageHeaders);

    public static ClaimsPrincipal ClaimsAsPrincipal(this IReadOnlyDictionary<string, string> headers)
    {
        var claimsIdentity = new ClaimsIdentity(Claims(headers));
        var claimsPrincipal = new ClaimsPrincipal();
        claimsPrincipal.AddIdentity(claimsIdentity);
        return claimsPrincipal;
    }

    public static Guid UserId(this IMessageHandlerContext context) =>
        Guid.Parse(GetClaim(context.MessageHeaders, ClaimTypes.UserData));

    public static Guid RoleId(this IMessageHandlerContext context) =>
        Guid.Parse(GetClaim(context.MessageHeaders, ClaimTypes.Role));

    public static string Email(this IMessageHandlerContext context) =>
        GetClaim(context.MessageHeaders, ClaimTypes.Email);

    static string GetClaim(IReadOnlyDictionary<string, string> headers, string claimSuffix)
    {
        var key = "propagate.claim." + claimSuffix;
        if (!headers.TryGetValue(key, out var value))
        {
            throw new($"Claim not found: {key}");
        }

        var values = JsonConvert.DeserializeObject<List<string>>(value)!;
        return values.Single();
    }

    #endregion

    public static IEnumerable<Claim> Claims(this IMessageHandlerContext context)
        => Claims(context.MessageHeaders);

    #region AddClaims

    public static void AddClaims(this SendOptions options, List<Claim> claims)
    {
        var claimsHeaders = new Dictionary<string, string>();
        AppendClaims(claims, claimsHeaders);
        foreach (var claimHeader in claimsHeaders)
        {
            options.SetHeader(claimHeader.Key, claimHeader.Value);
        }
    }

    #endregion

    public static void AppendClaim(Claim claim, IDictionary<string, string> headers)
        => AppendClaims(
            new[]
            {
                claim
            },
            headers);

    #region AppendClaims

    public static void AppendClaims(IEnumerable<Claim> claims, IDictionary<string, string> headers)
    {
        const string prefix = "propagate.claim.";
        foreach (var claim in claims.GroupBy(x => x.Type))
        {
            var items = claim.Select(x => x.Value);
            headers.Add(prefix + claim.Key, JsonConvert.SerializeObject(items));
        }
    }

    #endregion

    public static IEnumerable<Claim> Claims(IReadOnlyDictionary<string, string> headers)
    {
        const string? prefix = "claim.";
        foreach (var header in PropagationHeaders(headers))
        {
            var key = header.Key;
            if (!key.StartsWith(prefix))
            {
                continue;
            }

            key = key.Substring(prefix.Length, key.Length - prefix.Length);
            var list = JsonConvert.DeserializeObject<List<string>>(header.Value)!;
            foreach (var value in list)
            {
                yield return new(key, value);
            }
        }
    }

    public static string UpnClaim(this IEnumerable<Claim> claims)
    {
        var claim = claims.SingleOrDefault(x => string.Equals(x.Type, ClaimTypes.Upn, StringComparison.OrdinalIgnoreCase));
        if (claim != null)
        {
            return claim.Value;
        }

        throw new("No upn claim found.");
    }
}
namespace Utilities.NServiceBus;

/// <summary>
/// Mutator that finds the 'propagate.*' headers and adds them to the outgoing message's headers.
/// This makes it available for downstream handlers.
/// </summary>
public class PropagateHeadersMutator : IMutateOutgoingMessages
{
    public Task MutateOutgoing(MutateOutgoingMessageContext context)
    {
        var outgoing = context.OutgoingHeaders;

        outgoing.Add("NServiceBus.OriginatingVersion", AssemblyInfo.Version.ToString());
        outgoing.Add("NServiceBus.OriginatingSha", AssemblyInfo.Sha);

        if (!context.TryGetIncomingHeaders(out var incomingHeaders))
        {
            return Task.CompletedTask;
        }

        foreach (var header in incomingHeaders)
        {
            if (header.IsPropagationHeader())
            {
                outgoing.Add(header.Key, header.Value);
            }
        }

        return Task.CompletedTask;
    }
}

would love to see something similar OOTB in NSB.

let me know if u want the tests

SimonCropp avatar Jun 27 '22 04:06 SimonCropp