confluent-kafka-dotnet icon indicating copy to clipboard operation
confluent-kafka-dotnet copied to clipboard

Natively support Memory<byte>

Open verdie-g opened this issue 1 year ago • 8 comments
trafficstars

Closes https://github.com/confluentinc/confluent-kafka-dotnet/issues/1238, https://github.com/confluentinc/confluent-kafka-dotnet/pull/1603, https://github.com/confluentinc/confluent-kafka-dotnet/pull/1725, https://github.com/confluentinc/confluent-kafka-dotnet/pull/2177, https://github.com/confluentinc/confluent-kafka-dotnet/pull/2219, https://github.com/confluentinc/confluent-kafka-dotnet/pull/1782, https://github.com/confluentinc/confluent-kafka-dotnet/pull/2367.

Problem

The current serializer ISerializer forces the user to return a byte[]. This is an important performance problem for two reasons:

  1. It prevents sending a slice of a byte[]. In the following example, a protobuf object is serialized in a MemoryStream and to send these bytes to the Kafka client, MemoryStream.GetBuffer which returns the underlying buffer can't be used because it would return the data + some extra bytes. So MemoryStream.ToArray has to be used, meaning copying the entire buffer.
ProducerConfig config = new() { BootstrapServers = "localhost:9092" };
IProducer<Null, byte[]> producer = new ProducerBuilder<Null, byte[]>(config).Build();

MemoryStream ms = new();
Data.WriteTo(ms);
byte[] buffer = ms.ToArray(); // Copy the whole thing :/
_producer.Produce("demo", new Message<Null, byte[]> { Value = buffer });
  1. It prevents memory pooling. ArrayPool<T>.Rent can return a buffer greater than requested and the Kafka client doesn't support sending a "slice" of it. That's a corollary of the first problem but it's important to mention it because each buffer passed to Produce will be pinned, which cause an important amount of pressure on the GC. If pooling was available, we could create a pool of buffer on the POH (pinned object heap) using GC.AllocateArray<T>.

Solution

There are many PRs open aiming to extend the API to allow low allocation produce but they all have some caveats:

  • https://github.com/confluentinc/confluent-kafka-dotnet/pull/1603 tries to support Span but given the constraints of that type, it means creating a whole new set of API.
  • https://github.com/confluentinc/confluent-kafka-dotnet/pull/1725 suggests to use IBufferWriter instead, which unlike byte[] or Span, supports non-contiguous memory, kind of like Stream but it enables a lof of ways to reduce allocations. I don't think IBufferWriter is ideal here because in the end, a continuous memory buffer is expected for both the key and value. Also, the PR is introducing several breaking changes.
  • https://github.com/confluentinc/confluent-kafka-dotnet/pull/2367 simply adds a new Produce method that takes ArraySegments as parameter. This is a little awkward because you need to create a Producer<Null, Null> and it's also inconsistent with how you would produce byte[] (using Producer<byte[], byte[]>).
  • https://github.com/confluentinc/confluent-kafka-dotnet/pull/2177, https://github.com/confluentinc/confluent-kafka-dotnet/pull/2219, and https://github.com/confluentinc/confluent-kafka-dotnet/pull/1782 propose to support ArraySegment which does not break the API and fixes the original problem. Though it would prevent the use of Memory<byte> which is a more general type, preferred when dealing with contiguous memory when Span<byte> can't be used. This type is for example used in Stream.WriteAsync, Socket.SendAsync, and also in MemoryPool.Rent.

Instead of adding a third type of serializer (after ISerializer and IAsyncSerializer), this PR makes a special case for Memory<byte> (and ReadOnlyMemory<byte>), just like byte[] from the user POV, to directly use Message.Key and Message.Value.

So in the end, you can write

ProducerConfig config = new() { BootstrapServers = "localhost:9092" };
IProducer<Null, ReadOnlyMemory<byte>> producer = new ProducerBuilder<Null, ReadOnlyMemory<byte>>(config).Build();

MemoryStream ms = new();
Data.WriteTo(ms);
ReadOnlyMemory<byte> mem = ms.GetBuffer().AsMemory(0, (int)ms.Length);
_producer.Produce("demo", new Message<Null, ReadOnlyMemory<byte>> { Value = mem });

Benchmark

Here is a benchmark using BenchmarkDotNet which serializes a protobuf object of ~10 KB from open-telemetry/opentelemetry-proto and produces it.

  • ProduceByteArray copies the MemoryStream to a byte[]
  • ProduceMemory gets a slice of the MemoryStream with no copy. It's 35% faster and uses 20% less memory
  • ProducePooledMemory is similar as ProduceMemory but uses Microsoft.IO.RecyclableMemoryStream to show how pooling could work. It's 32% faster and uses 83% less memory

In real-world the benefits could be way higher because the benchmark doesn't capture the decreased pressure on the GC from the avoided allocation as well as the pinned buffers being reused.

Results

Method Mean Error StdDev Ratio RatioSD Allocated Alloc Ratio
ProduceByteArray 88.26 us 5.763 us 14.774 us 1.02 0.23 44.22 KB 1.00
ProduceMemory 57.74 us 1.987 us 5.540 us 0.67 0.11 35.37 KB 0.80
ProducePooledMemory 60.36 us 2.582 us 7.449 us 0.70 0.13 7.51 KB 0.17

Code

using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
using Confluent.Kafka;
using Google.Protobuf;
using Microsoft.IO;
using OpenTelemetry.Proto.Common.V1;
using OpenTelemetry.Proto.Metrics.V1;

BenchmarkSwitcher.FromAssembly(typeof(Program).Assembly)
    .Run(Array.Empty<string>(), new BenchmarkDotNet.Configs.DebugInProcessConfig());

[MemoryDiagnoser]
public class ProduceBench
{
    private static readonly MetricsData Data = GenerateData();

    private static readonly RecyclableMemoryStreamManager MemoryManager = new(new RecyclableMemoryStreamManager.Options
    {
        BlockSize = 10 * 1024,
    });

    private static MetricsData GenerateData()
    {
        return new MetricsData
        {
            ResourceMetrics =
            {
                new ResourceMetrics
                {
                    ScopeMetrics =
                    {
                        new ScopeMetrics
                        {
                            Metrics =
                            {
                                Enumerable.Range(0, 100).Select(_ => new Metric
                                {
                                    Name = "example_gauge",
                                    Description = "An example of a gauge metric",
                                    Unit = "ms",
                                    Gauge = new Gauge
                                    {
                                        DataPoints =
                                        {
                                            new NumberDataPoint
                                            {
                                                StartTimeUnixNano = 1,
                                                TimeUnixNano = 2,
                                                AsDouble = 123.45,
                                                Attributes =
                                                {
                                                    new KeyValue
                                                    {
                                                        Key = "key2",
                                                        Value = new AnyValue { StringValue = "value2" },
                                                    }
                                                }
                                            }
                                        }
                                    }
                                })
                            }
                        }
                    }
                }
            }
        };
    }

    private IProducer<Null, byte[]> _producer1 = default!;
    private IProducer<Null, ReadOnlyMemory<byte>> _producer2 = default!;

    [GlobalSetup]
    public void GlobalSetup()
    {
        ProducerConfig config = new() { BootstrapServers = "localhost:9092" };
        _producer1 = new ProducerBuilder<Null, byte[]>(config).Build();
        _producer2 = new ProducerBuilder<Null, ReadOnlyMemory<byte>>(config).Build();
    }

    [GlobalCleanup]
    public void GlobalCleanup()
    {
        _producer1.Dispose();
        _producer2.Dispose();
    }

    [IterationSetup]
    public void IterationSetup()
    {
        _producer1.Flush();
        _producer2.Flush();
    }

    [Benchmark(Baseline = true)]
    public void ProduceByteArray()
    {
        MemoryStream ms = new();
        Data.WriteTo(ms);
        _producer1.Produce("demo", new Message<Null, byte[]> { Value = ms.ToArray() });
    }

    [Benchmark]
    public void ProduceMemory()
    {
        MemoryStream ms = new();
        Data.WriteTo(ms);
        ReadOnlyMemory<byte> mem = ms.GetBuffer().AsMemory(0, (int)ms.Length);
        _producer2.Produce("demo", new Message<Null, ReadOnlyMemory<byte>> { Value = mem });
    }

    [Benchmark]
    public void ProducePooledMemory()
    {
        using var ms = MemoryManager.GetStream();
        Data.WriteTo((Stream)ms);
        ReadOnlyMemory<byte> mem = ms.GetBuffer().AsMemory(0, (int)ms.Length);
        _producer2.Produce("demo", new Message<Null, ReadOnlyMemory<byte>> { Value = mem });
    }
}

Production test

We have also load tested a prod service that uses kafka extensively.

Before: image

After: image

What can be observed is that, with the change, on the 3rd step, the latency is greatly better, while before the change, some requests even fail.

verdie-g avatar Sep 13 '24 21:09 verdie-g

CLA assistant check
All committers have signed the CLA.

cla-assistant[bot] avatar Sep 13 '24 21:09 cla-assistant[bot]

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

cla-assistant[bot] avatar Sep 13 '24 21:09 cla-assistant[bot]

There seems to be no progress on either of the several approaches to reduce allocations on the publishing side of Kafka. All of them are stuck at the review phase.

We are affected by the issue as well and would really like to see how we can push this topic so that it finally gets the attention it deserves.

@verdie-g Did you end up forking the project and adding your changes?

bmesetovic avatar Nov 11 '24 12:11 bmesetovic

We used a fork with https://github.com/confluentinc/confluent-kafka-dotnet/pull/2219 just to confirm the important performance benefits but we were not ready to maintain a fork.

We are still waiting for a review on that PR.

verdie-g avatar Nov 11 '24 12:11 verdie-g

:tada: All Contributor License Agreements have been signed. Ready to merge.
:white_check_mark: verdie-g
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

@Claimundefine @rayokota could you have a look at his PR 🙏 It's been sitting for a while and I believe it fixes an important performance issue in the client.

verdie-g avatar Apr 14 '25 17:04 verdie-g

or @anchitj @emasab 🥺

verdie-g avatar Apr 22 '25 18:04 verdie-g

I also would love to see this feature, @anchitj @emasab, give this Dino some hugs from your side

Scylin232 avatar May 02 '25 09:05 Scylin232