confluent-kafka-dotnet
confluent-kafka-dotnet copied to clipboard
Natively support Memory<byte>
Closes https://github.com/confluentinc/confluent-kafka-dotnet/issues/1238, https://github.com/confluentinc/confluent-kafka-dotnet/pull/1603, https://github.com/confluentinc/confluent-kafka-dotnet/pull/1725, https://github.com/confluentinc/confluent-kafka-dotnet/pull/2177, https://github.com/confluentinc/confluent-kafka-dotnet/pull/2219, https://github.com/confluentinc/confluent-kafka-dotnet/pull/1782, https://github.com/confluentinc/confluent-kafka-dotnet/pull/2367.
Problem
The current serializer ISerializer forces the user to return a byte[]. This is an important performance problem for two reasons:
- It prevents sending a slice of a
byte[]. In the following example, a protobuf object is serialized in aMemoryStreamand to send these bytes to the Kafka client, MemoryStream.GetBuffer which returns the underlying buffer can't be used because it would return the data + some extra bytes. SoMemoryStream.ToArrayhas to be used, meaning copying the entire buffer.
ProducerConfig config = new() { BootstrapServers = "localhost:9092" };
IProducer<Null, byte[]> producer = new ProducerBuilder<Null, byte[]>(config).Build();
MemoryStream ms = new();
Data.WriteTo(ms);
byte[] buffer = ms.ToArray(); // Copy the whole thing :/
_producer.Produce("demo", new Message<Null, byte[]> { Value = buffer });
- It prevents memory pooling. ArrayPool<T>.Rent can return a buffer greater than requested and the Kafka client doesn't support sending a "slice" of it. That's a corollary of the first problem but it's important to mention it because each buffer passed to
Producewill be pinned, which cause an important amount of pressure on the GC. If pooling was available, we could create a pool of buffer on the POH (pinned object heap) using GC.AllocateArray<T>.
Solution
There are many PRs open aiming to extend the API to allow low allocation produce but they all have some caveats:
- https://github.com/confluentinc/confluent-kafka-dotnet/pull/1603 tries to support
Spanbut given the constraints of that type, it means creating a whole new set of API. - https://github.com/confluentinc/confluent-kafka-dotnet/pull/1725 suggests to use
IBufferWriterinstead, which unlikebyte[]orSpan, supports non-contiguous memory, kind of likeStreambut it enables a lof of ways to reduce allocations. I don't thinkIBufferWriteris ideal here because in the end, a continuous memory buffer is expected for both the key and value. Also, the PR is introducing several breaking changes. - https://github.com/confluentinc/confluent-kafka-dotnet/pull/2367 simply adds a new Produce method that takes
ArraySegments as parameter. This is a little awkward because you need to create aProducer<Null, Null>and it's also inconsistent with how you would producebyte[](usingProducer<byte[], byte[]>). - https://github.com/confluentinc/confluent-kafka-dotnet/pull/2177, https://github.com/confluentinc/confluent-kafka-dotnet/pull/2219, and https://github.com/confluentinc/confluent-kafka-dotnet/pull/1782 propose to support
ArraySegmentwhich does not break the API and fixes the original problem. Though it would prevent the use ofMemory<byte>which is a more general type, preferred when dealing with contiguous memory whenSpan<byte>can't be used. This type is for example used in Stream.WriteAsync, Socket.SendAsync, and also in MemoryPool.Rent.
Instead of adding a third type of serializer (after ISerializer and IAsyncSerializer), this PR makes a special case for Memory<byte> (and ReadOnlyMemory<byte>), just like byte[] from the user POV, to directly use Message.Key and Message.Value.
So in the end, you can write
ProducerConfig config = new() { BootstrapServers = "localhost:9092" };
IProducer<Null, ReadOnlyMemory<byte>> producer = new ProducerBuilder<Null, ReadOnlyMemory<byte>>(config).Build();
MemoryStream ms = new();
Data.WriteTo(ms);
ReadOnlyMemory<byte> mem = ms.GetBuffer().AsMemory(0, (int)ms.Length);
_producer.Produce("demo", new Message<Null, ReadOnlyMemory<byte>> { Value = mem });
Benchmark
Here is a benchmark using BenchmarkDotNet which serializes a protobuf object of ~10 KB from open-telemetry/opentelemetry-proto and produces it.
ProduceByteArraycopies theMemoryStreamto abyte[]ProduceMemorygets a slice of theMemoryStreamwith no copy. It's 35% faster and uses 20% less memoryProducePooledMemoryis similar asProduceMemorybut uses Microsoft.IO.RecyclableMemoryStream to show how pooling could work. It's 32% faster and uses 83% less memory
In real-world the benefits could be way higher because the benchmark doesn't capture the decreased pressure on the GC from the avoided allocation as well as the pinned buffers being reused.
Results
| Method | Mean | Error | StdDev | Ratio | RatioSD | Allocated | Alloc Ratio |
|---|---|---|---|---|---|---|---|
| ProduceByteArray | 88.26 us | 5.763 us | 14.774 us | 1.02 | 0.23 | 44.22 KB | 1.00 |
| ProduceMemory | 57.74 us | 1.987 us | 5.540 us | 0.67 | 0.11 | 35.37 KB | 0.80 |
| ProducePooledMemory | 60.36 us | 2.582 us | 7.449 us | 0.70 | 0.13 | 7.51 KB | 0.17 |
Code
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
using Confluent.Kafka;
using Google.Protobuf;
using Microsoft.IO;
using OpenTelemetry.Proto.Common.V1;
using OpenTelemetry.Proto.Metrics.V1;
BenchmarkSwitcher.FromAssembly(typeof(Program).Assembly)
.Run(Array.Empty<string>(), new BenchmarkDotNet.Configs.DebugInProcessConfig());
[MemoryDiagnoser]
public class ProduceBench
{
private static readonly MetricsData Data = GenerateData();
private static readonly RecyclableMemoryStreamManager MemoryManager = new(new RecyclableMemoryStreamManager.Options
{
BlockSize = 10 * 1024,
});
private static MetricsData GenerateData()
{
return new MetricsData
{
ResourceMetrics =
{
new ResourceMetrics
{
ScopeMetrics =
{
new ScopeMetrics
{
Metrics =
{
Enumerable.Range(0, 100).Select(_ => new Metric
{
Name = "example_gauge",
Description = "An example of a gauge metric",
Unit = "ms",
Gauge = new Gauge
{
DataPoints =
{
new NumberDataPoint
{
StartTimeUnixNano = 1,
TimeUnixNano = 2,
AsDouble = 123.45,
Attributes =
{
new KeyValue
{
Key = "key2",
Value = new AnyValue { StringValue = "value2" },
}
}
}
}
}
})
}
}
}
}
}
};
}
private IProducer<Null, byte[]> _producer1 = default!;
private IProducer<Null, ReadOnlyMemory<byte>> _producer2 = default!;
[GlobalSetup]
public void GlobalSetup()
{
ProducerConfig config = new() { BootstrapServers = "localhost:9092" };
_producer1 = new ProducerBuilder<Null, byte[]>(config).Build();
_producer2 = new ProducerBuilder<Null, ReadOnlyMemory<byte>>(config).Build();
}
[GlobalCleanup]
public void GlobalCleanup()
{
_producer1.Dispose();
_producer2.Dispose();
}
[IterationSetup]
public void IterationSetup()
{
_producer1.Flush();
_producer2.Flush();
}
[Benchmark(Baseline = true)]
public void ProduceByteArray()
{
MemoryStream ms = new();
Data.WriteTo(ms);
_producer1.Produce("demo", new Message<Null, byte[]> { Value = ms.ToArray() });
}
[Benchmark]
public void ProduceMemory()
{
MemoryStream ms = new();
Data.WriteTo(ms);
ReadOnlyMemory<byte> mem = ms.GetBuffer().AsMemory(0, (int)ms.Length);
_producer2.Produce("demo", new Message<Null, ReadOnlyMemory<byte>> { Value = mem });
}
[Benchmark]
public void ProducePooledMemory()
{
using var ms = MemoryManager.GetStream();
Data.WriteTo((Stream)ms);
ReadOnlyMemory<byte> mem = ms.GetBuffer().AsMemory(0, (int)ms.Length);
_producer2.Produce("demo", new Message<Null, ReadOnlyMemory<byte>> { Value = mem });
}
}
Production test
We have also load tested a prod service that uses kafka extensively.
Before:
After:
What can be observed is that, with the change, on the 3rd step, the latency is greatly better, while before the change, some requests even fail.
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.
There seems to be no progress on either of the several approaches to reduce allocations on the publishing side of Kafka. All of them are stuck at the review phase.
We are affected by the issue as well and would really like to see how we can push this topic so that it finally gets the attention it deserves.
@verdie-g Did you end up forking the project and adding your changes?
We used a fork with https://github.com/confluentinc/confluent-kafka-dotnet/pull/2219 just to confirm the important performance benefits but we were not ready to maintain a fork.
We are still waiting for a review on that PR.
:tada: All Contributor License Agreements have been signed. Ready to merge.
:white_check_mark: verdie-g
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.
@Claimundefine @rayokota could you have a look at his PR 🙏 It's been sitting for a while and I believe it fixes an important performance issue in the client.
or @anchitj @emasab 🥺
I also would love to see this feature, @anchitj @emasab, give this Dino some hugs from your side