confluent-kafka-dotnet
confluent-kafka-dotnet copied to clipboard
Bulk Insertion in Kafka .Net Client not waiting for specific time interval
Description
I have a simple Kafka producer. I am trying to send message in bulk after a specific interval of time but it is sending the data to the Kafka as soon as the code call the produceAsync method and not waiting for the specific interval. As per my understanding it should wait for 20 seconds before producing the data.
Code
public async Task BatchProduceAsync(string clientId)
{
string topic = "Batching";
var config = new ProducerConfig
{
BootstrapServers = "172.26.99.250:9092",
ClientId = clientId,
//linger.ms refers to the time to wait before sending messages out to Kafka. It defaults to 0, which the system interprets as 'send messages as soon as they are ready to be sent'
LingerMs = TimeSpan.FromSeconds(20).TotalMilliseconds,
// batch.size measures batch size in total bytes instead of the number of messages. It controls how many bytes of data to collect before sending messages to the Kafka broker.
// Set this as high as possible, without exceeding available memory
BatchSize = 10 * 1024
// In batch either what ever occurs first either ms or batch size, kafka producer will send the message
};
using (var producer = new ProducerBuilder<string, string>(config).Build())
{
for (var i = 0; i < 50; ++i)
{
var value = $"Hello World {i}";
var message = new Message<string, string>()
{
Value = value,
Key = clientId
};
var result = producer.ProduceAsync(topic, message);
}
producer.Flush();
}
}
Hi @shujaatsiddiqui , I've Checked your code , it's totally ok but the only issue with your code is you didn't use await in your ProduceAsync method so you should use this if your class is async
var result = await producer.ProduceAsync(topic, message);
or if your class isn't async use this
var result = producer.ProduceAsync(topic, message).GetAwaiter().GetResult();
then your code works correctly!
i believe flush would cause the messages to be sent immediately. what happens if you Thread.Sleep(20000) before flush?
i believe flush would cause the messages to be sent immediately. what happens if you Thread.Sleep(20000) before flush?
I Agree , but I just implemented this approach because @shujaatsiddiqui wants to use this way.