confluent-kafka-dotnet icon indicating copy to clipboard operation
confluent-kafka-dotnet copied to clipboard

Bulk Insertion in Kafka .Net Client not waiting for specific time interval

Open shujaatsiddiqui opened this issue 3 years ago • 3 comments
trafficstars

Description

I have a simple Kafka producer. I am trying to send message in bulk after a specific interval of time but it is sending the data to the Kafka as soon as the code call the produceAsync method and not waiting for the specific interval. As per my understanding it should wait for 20 seconds before producing the data.

Code

public async Task BatchProduceAsync(string clientId)
    {
        string topic = "Batching";
        var config = new ProducerConfig
        {
            BootstrapServers = "172.26.99.250:9092",
            ClientId = clientId,

            //linger.ms refers to the time to wait before sending messages out to Kafka. It defaults to 0, which the system interprets as 'send messages as soon as they are ready to be sent'
            LingerMs = TimeSpan.FromSeconds(20).TotalMilliseconds,

            // batch.size measures batch size in total bytes instead of the number of messages. It controls how many bytes of data to collect before sending messages to the Kafka broker.
            // Set this as high as possible, without exceeding available memory
            BatchSize = 10 * 1024

            // In batch either what ever occurs first either ms or batch size, kafka producer will send the message

        };

        using (var producer = new ProducerBuilder<string, string>(config).Build())
        {
            for (var i = 0; i < 50; ++i)
            {
                var value = $"Hello World {i}";

                var message = new Message<string, string>()
                {
                    Value = value,
                    Key = clientId
                };
                var result = producer.ProduceAsync(topic, message);
            }
            producer.Flush();

        }
    }

shujaatsiddiqui avatar Jul 15 '22 17:07 shujaatsiddiqui

Hi @shujaatsiddiqui , I've Checked your code , it's totally ok but the only issue with your code is you didn't use await in your ProduceAsync method so you should use this if your class is async

var result =  await producer.ProduceAsync(topic, message);

or if your class isn't async use this

var result =  producer.ProduceAsync(topic, message).GetAwaiter().GetResult();

then your code works correctly!

amirvalhalla avatar Jul 19 '22 08:07 amirvalhalla

i believe flush would cause the messages to be sent immediately. what happens if you Thread.Sleep(20000) before flush?

mhowlett avatar Aug 07 '22 14:08 mhowlett

i believe flush would cause the messages to be sent immediately. what happens if you Thread.Sleep(20000) before flush?

I Agree , but I just implemented this approach because @shujaatsiddiqui wants to use this way.

amirvalhalla avatar Aug 07 '22 14:08 amirvalhalla