azure-sdk-for-net icon indicating copy to clipboard operation
azure-sdk-for-net copied to clipboard

Same PartitionKey does not guarantee same Partition

Open NickKimpe opened this issue 1 year ago • 6 comments

Hello,

issue: Setting (the same) PartitionKey does not guarantee all events will appear in the same partition when sending them in different 'chunks'

context: Our downstream processing needs ordered processing. We assumed all our events with the same partition key would be processed in order, as long as we use a PartitionKey to publish our events to event hubs. This did not work out as expected. After some troubleshooting we pinpointed the problem to race conditions as a result of parallel processing. This happened because the events were spread over different partitions, even though the same PartitionKey was supplied.

Our assumption was based on this quote found in this documentation page:

When publishing events, it may be desirable to request that the Event Hubs service keep the different event batches together on the same partition. This can be accomplished by setting a partition key when creating the batch. The partition key is NOT the identifier of a specific partition. Rather, it is an arbitrary piece of string data that Event Hubs uses as the basis to compute a hash value. Event Hubs will associate the hash value with a specific partition, ensuring that any events published with the same partition key are routed to the same partition.

If the documentation text is right, then how can we acheive this?

Below is some sample code to reproduce the issue.

Sample code to send items:

using System.Text;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;

namespace ConsoleApp;

internal class Program
{
    private const string ConnectionString = "...";
    private const string EventHubName = "myexperimenthub";
    
    private static readonly EventHubProducerClient ProducerClient = new EventHubProducerClient(ConnectionString, EventHubName);

    private static async Task Main()
    {
        const int amountOfBatches = 100;
        const int batchSize = 20;
        var partitionKey = $"some-partition-key_{Guid.NewGuid()}";

        Console.Write($"Sending '{amountOfBatches * batchSize}' events to event hubs in '{amountOfBatches}' batches of '{batchSize}' items, using '{partitionKey}' as partition key for every batch...");

        var sendEventOptions = new SendEventOptions { PartitionKey = partitionKey };

        for (var batchIndex = 0; batchIndex < amountOfBatches; batchIndex++)
        {
            var eventBodyList = new List<string>();
            for (var eventIndex = 0; eventIndex < batchSize; eventIndex++)
            {
                eventBodyList.Add($"event {eventIndex} from batch {batchIndex}");
            }

            var eventDataList = eventBodyList.Select(eventBody => new EventData(Encoding.UTF8.GetBytes(eventBody)));
            await ProducerClient.SendAsync(eventDataList, sendEventOptions);
        }

        Console.WriteLine("Done");
    }
}

Console output:

Sending '2000' events to event hubs in '100' batches of '20' items, using 'some-partition-key_99e37fa3-ff2b-4edf-9b4f-74aed7ec5cab' as partition key for every batch...Done

Sample code used to receive the events (Azure Function with EventHubTrigger):

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;

namespace FunctionApp;

public static class ReceivingFunction
{
    private static readonly ConcurrentBag<EventInfo> InformationStore = new();

    [FunctionName(nameof(ReceivingFunction))]
    public static void Run(
        [EventHubTrigger("myexperimenthub", Connection = "EventHubConnectionString")]
        EventData[] events,
        PartitionContext partitionContext,
        ILogger logger)
    {
        foreach (var eventData in events)
        {
            InformationStore.Add(new EventInfo(eventData.PartitionKey, partitionContext.PartitionId));
        }
        
        var logMessage = InformationStore.ToList().SummarizeEverything();
        logger.LogInformation(logMessage.Replace("\r\n","\n"));
    }
}

public static class LogMessageExtensions
{
    public static string SummarizeEverything(this IEnumerable<EventInfo> bag)
    {
        var summary = new StringBuilder();
        summary.AppendLine("Summary of the situation so far:");
        foreach (var infoByPartitionKey in bag.GroupBy(i => i.PartitionKey))
        {
            var numberOfEvents = infoByPartitionKey.Count();
            var infoByPartitionId = infoByPartitionKey.GroupBy(i => i.PartitionId);

            summary.AppendLine($"'{infoByPartitionId.Count()}' PartitionId(s) were used for PartitionKey '{infoByPartitionKey.Key}' ('{numberOfEvents}' events  received).");
        }

        return summary.ToString();
    }
}

public class EventInfo
{
    public EventInfo(string partitionKey, string partitionId)
    {
        PartitionKey = partitionKey;
        PartitionId = partitionId;
    }

    public string PartitionKey { get; }
    public string PartitionId { get; }
}

Last prompted console output:

[2022-08-05T12:21:32.972Z] Summary of the situation so far:
'32' PartitionId(s) were used for PartitionKey 'some-partition-key_99e37fa3-ff2b-4edf-9b4f-74aed7ec5cab' ('2000' events  received).

NickKimpe avatar Aug 05 '22 12:08 NickKimpe

Hi @NickKimpe. Thank you for bringing this to our attention. A regression was introduced in #28055 where the specific overloads of SendAsync that accept an enumerable are not including the partition key as part of the publish. Because of this, the events are triggering round-robin assignment on the Event Hub, causing them to spread across partitions.

I just put out a fix as well as additional validations in the test suite to better prevent this type of mistake going forward. Until our next release, using an explicit EventDataBatch when publishing with a partition key will ensure that events are properly grouped.

jsquire avatar Aug 05 '22 19:08 jsquire

Hi @NickKimpe. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text “/unresolve” to remove the “issue-addressed” label and continue the conversation.

ghost avatar Aug 05 '22 19:08 ghost

/unresolve

Hi @jsquire , thanks for the quick response. Maybe I'm doing something wrong, but the explicit EventDataBatch option yields the same results for us.

FYI, we are using package version Azure.Messaging.EventHubs 5.7.1

So we were wondering, did we use EventDataBatch in the correct way? Or is there another workaround for us?

This is the adjusted code (from the sample above) to send the events:

        const int amountOfBatches = 100;
        const int batchSize = 20;
        var partitionKey = $"some-partition-key_{Guid.NewGuid()}";

        Console.Write($"Sending '{amountOfBatches * batchSize}' events to event hubs in '{amountOfBatches}' batches of '{batchSize}' items, using '{partitionKey}' as partition key for every batch...");

        var createBatchOptions = new CreateBatchOptions { PartitionKey = partitionKey };

        for (var batchIndex = 0; batchIndex < amountOfBatches; batchIndex++)
        {
            using var eventBatch = await ProducerClient.CreateBatchAsync(createBatchOptions);
            for (var eventIndex = 0; eventIndex < batchSize; eventIndex++)
            {
                if (!eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes($"event {eventIndex} from batch {batchIndex}"))))
                    throw new Exception("Batch size exceeded.");
            }

            await ProducerClient.SendAsync(eventBatch);
        }

        Console.WriteLine("Done");

But it results in the same output... Output of the sender:

Sending '2000' events to event hubs in '100' batches of '20' items, using 'some-partition-key_cdb1c1d5-73f4-48ca-b68a-26a69bb31b5f' as partition key for every batch...Done

Output of the receiver:

[2022-08-08T08:38:28.660Z] Summary of the situation so far:
'32' PartitionId(s) were used for PartitionKey 'some-partition-key_cdb1c1d5-73f4-48ca-b68a-26a69bb31b5f' ('2000' events  received).

NickKimpe avatar Aug 08 '22 08:08 NickKimpe

Hi @NickKimpe. You're correct. I misread a code path, and both eventually join to require the fix. This will be released tomorrow as v5.7.2. Unfortunately, there's no easy mitigation in the meantime. The closest that you'd be able to achieve is manually assigning the partition id to force a specific partition.

Apologies for the inconvenience; this one was big oversight on our part.

jsquire avatar Aug 08 '22 15:08 jsquire

v5.7.2 has been released with a fix for this.

jsquire avatar Aug 09 '22 13:08 jsquire

Hi @NickKimpe. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text “/unresolve” to remove the “issue-addressed” label and continue the conversation.

ghost avatar Aug 09 '22 13:08 ghost

Hi @NickKimpe, since you haven’t asked that we “/unresolve” the issue, we’ll close this out. If you believe further discussion is needed, please add a comment “/unresolve” to reopen the issue.

ghost avatar Aug 16 '22 16:08 ghost