confluent-kafka-dotnet
confluent-kafka-dotnet copied to clipboard
linger.ms sends one message per interval, not batch
Description
Not 100% sure whether this is a Confluent.Kafka issue or a librdkafka issue.
I'm using Confluent.Kafka 1.4.3 (librdkafka 1.4.2) in an Azure Function App and DotNet Core 2.1.
When I set the LingerMs setting to a high value, say 5000, the client sends one message every 5 seconds. Not a MessageSet containing all messages I've tried to send during those 5 seconds, just the one message. E.g. if I'm trying to send 200k messages this is going to take over 11 days!
As I understand it, the client should be batching up messages I'm trying to send within each 5 second interval, and then sending them as one MessageSet to the broker (Confluent Cloud in this case).
I'm viewing the messages coming in in the Confluent Cloud UI, and I see the successive messages on each partition having 1 higher offset than the previous one, so it's not that the UI is displaying one message per MessageSet or anything like that.
My client config:
var pConfig = new ProducerConfig
{
BootstrapServers = _config["KafkaBootstrapServers"],
SaslMechanism = SaslMechanism.Plain,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslUsername = _config["KafkaUsername"],
SaslPassword = _config["KafkaPassword"],
MessageSendMaxRetries = 10,
MessageTimeoutMs = 30 * 1000,
RetryBackoffMs = 250,
Acks = Acks.Leader,
LingerMs = 5000,
CompressionType = CompressionType.Gzip,
CompressionLevel = 12,
SslCaLocation = "D:\\home\\site\\wwwroot\\cacert.pem"
};
Sample code which tries sending as quickly as possible:
public async Task sendToKafkaTopic<T>(string topic, List<T> reportDtos)
{
var kafkaMessages = new List<Message<string, string>>();
foreach(var reportDto in reportDtos)
{
string msgBody = JsonConvert.SerializeObject(reportDto);
var message = new Message<string, string> { Key = Guid.NewGuid().ToString(), Value = msgBody };
kafkaMessages.Add(message);
}
foreach (var kafkaMessage in kafkaMessages)
{
try
{
var deliveryResult = await _producer.ProduceAsync(topic, kafkaMessage);
_log.LogInformation($"DEBUG: Delivered the following to {deliveryResult.TopicPartitionOffset}:\n\n{deliveryResult.Value}");
}
catch (ProduceException<string, string> e)
{
_log.LogInformation($"DEBUG: Kafka exception! (Error: {e.Error}. Message: {e.Message})");
throw new Exception($"Kafka exception! --> Error: {e.Error}. Message: {e.Message}", e);
}
}
}
I've tried sending messages with and without keys - no difference.
How to reproduce
- Create a function app in Azure.
- Configure client with a high value in the LingerMs setting.
- Create a loop that tries to send, say, 100.000 messages to a topic in Confluent Cloud.
Checklist
Please provide the following information:
- [x] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
- [x] Confluent.Kafka nuget version.
- [x] Apache Kafka version.
- [x] Client configuration.
- [x] Operating system.
- [ ] Provide logs (with "debug" : "..." as necessary in configuration).
- [ ] Provide broker log excerpts.
- [x] Critical issue.
Sorry about the code formatting there, can't seem to get GitHub to parse that correctly...
Sorry about the code formatting there, can't seem to get GitHub to parse that correctly...
just put it in closing and starting tags:
public async Task sendToKafkaTopic(string topic, List reportDtos)
{
var kafkaMessages = new List<Message<string, string>>();
foreach(var reportDto in reportDtos)
{
string msgBody = JsonConvert.SerializeObject(reportDto);
var message = new Message<string, string> { Key = Guid.NewGuid().ToString(), Value = msgBody };
kafkaMessages.Add(message);
}
foreach (var kafkaMessage in kafkaMessages)
{
try
{
var deliveryResult = await _producer.ProduceAsync(topic, kafkaMessage);
_log.LogInformation($"DEBUG: Delivered the following to {deliveryResult.TopicPartitionOffset}:\n\n{deliveryResult.Value}");
}
catch (ProduceException<string, string> e)
{
_log.LogInformation($"DEBUG: Kafka exception! (Error: {e.Error}. Message: {e.Message})");
throw new Exception($"Kafka exception! --> Error: {e.Error}. Message: {e.Message}", e);
}
}
}
because you're producing synchronously - you produce a message, librdkafka waits 5s for any additional messages to include in the batch (there won't be because you're blocking), then sends off the produce request to the broker. the response comes back, the produce request unblocks, repeat.
Aha. Thanks. For some reason I had just assumed librdkafka had decoupled this from the thread I was sending from - but yeah, that doesn't really make much sense. Sorry. I also now see that the docs specifically mention this. Double bad on my part :P
Aha. Thanks. For some reason I had just assumed librdkafka had decoupled this from the thread I was sending from - but yeah, that doesn't really make much sense. Sorry. I also now see that the docs specifically mention this. Double bad on my part :P
What corrections did you implement to make it work asynchronously?
Aha. Thanks. For some reason I had just assumed librdkafka had decoupled this from the thread I was sending from - but yeah, that doesn't really make much sense. Sorry. I also now see that the docs specifically mention this. Double bad on my part :P
What corrections did you implement to make it work asynchronously?
That was over two years ago. The latest version that we used was as follows:
public async Task sendToKafkaTopic<T>(string topic, List<T> reportDtos)
{
List<Task<DeliveryResult<string,string>>> sendTasks = new List<Task<DeliveryResult<string,string>>>();
_log.LogInformation("Starting sending to Kafka...");
foreach (var reportDto in reportDtos)
{
string msgBody = JsonConvert.SerializeObject(reportDto);
var message = new Message<string, string> { Key = Guid.NewGuid().ToString(), Value = msgBody };
sendTasks.Add(_producer.ProduceAsync(topic, message));
}
await Task.WhenAll(sendTasks).ContinueWith((result) => {
if(result.Exception != null)
{
throw new Exception($"Exception was thrown during sending to Kafka! Message: {result.Exception.Message}", result.Exception);
}
});
_log.LogInformation($"Successfully sent {reportDtos.Count} messages to Kafka!");
}
Aha. Thanks. For some reason I had just assumed librdkafka had decoupled this from the thread I was sending from - but yeah, that doesn't really make much sense. Sorry. I also now see that the docs specifically mention this. Double bad on my part :P
What corrections did you implement to make it work asynchronously?
That was over two years ago. The latest version that we used was as follows:
public async Task sendToKafkaTopic<T>(string topic, List<T> reportDtos) { List<Task<DeliveryResult<string,string>>> sendTasks = new List<Task<DeliveryResult<string,string>>>(); _log.LogInformation("Starting sending to Kafka..."); foreach (var reportDto in reportDtos) { string msgBody = JsonConvert.SerializeObject(reportDto); var message = new Message<string, string> { Key = Guid.NewGuid().ToString(), Value = msgBody }; sendTasks.Add(_producer.ProduceAsync(topic, message)); } await Task.WhenAll(sendTasks).ContinueWith((result) => { if(result.Exception != null) { throw new Exception($"Exception was thrown during sending to Kafka! Message: {result.Exception.Message}", result.Exception); } }); _log.LogInformation($"Successfully sent {reportDtos.Count} messages to Kafka!"); }
Thank you for your response.