confluent-kafka-dotnet icon indicating copy to clipboard operation
confluent-kafka-dotnet copied to clipboard

Consume multiple messages

Open arturaslt opened this issue 5 years ago • 14 comments

Question, Is it possible to read multiple messages/stream of bytes from kafka topic ?

Right know I can't find any information regardless consume bytes of array/ multiple messages at once

Since consuming each message individually takes a lot of time.

If such case is impossible, what's the best solution would be to consume a lot of data (50gb) each day

arturaslt avatar Jan 16 '20 12:01 arturaslt

The API provides you messages one at a time, but this is from an internal queue on the client, and behind the scenes there is a lot going on to ensure high throughput from the brokers. The client will very easily handle 50Gb/day (this is a small amount of data in Kafka terms).

mhowlett avatar Jan 16 '20 16:01 mhowlett

You can look at creating a list of messages internally and process them after x seconds. We use a timer and trigger the processing of messages once the timer event is elapsed. Motivation for batching in our scenario is to perform DB operations in batch.

ashishbhatia22 avatar Jan 27 '20 15:01 ashishbhatia22

@mhowlett Any plans for adding ConsumeBatch method to IConsumer? If not then can you validate implementation provided below? It it based on the assumption that consumer.Consume(TimeSpan.Zero) will not call the broker instead only check if there is something on internal queue (which does not involve any IO bound operation) and return message from internal queue or null immediately. Is that assumption correct and if yes can it change it future resulting in breaking this code?

internal static class ConsumerExtensions
{
    public static IReadOnlyCollection<ConsumeResult<TKey, TValue>> ConsumeBatch<TKey, TValue>(this IConsumer<TKey, TValue> consumer, TimeSpan consumeTimeout, int maxBatchSize)
    {
        var message = consumer.Consume(consumeTimeout);

        if (message?.Message is null)
            return Array.Empty<ConsumeResult<TKey, TValue>>();

        var messageBatch = new List<ConsumeResult<TKey, TValue>> { message };

        while (messageBatch.Count < maxBatchSize)
        {
            message = consumer.Consume(TimeSpan.Zero);
            if (message?.Message is null)
                break;

            messageBatch.Add(message);
        }

        return messageBatch;
    }
}

KrzysztofBranicki avatar Apr 07 '20 10:04 KrzysztofBranicki

yep that will work (yes, consume reads from an internal queue, and broker fetch requests happen in background threads).

what is your use-case for requiring a batch of messages?

mhowlett avatar Apr 07 '20 15:04 mhowlett

Batch consume requirement is not super common use-case in our system, but it appears in two places. First is the case when we would want to do also batch update on the database based on multiple messages rather than doing it message by message. Second is when we replicate topic from one Kafka cluster to second Kafka cluster in different AWS region. When replicating we would like to consume batch and produce batch as it seems to be most optimal performance wise.

Additional question for consumer.Consume(timeout). When timeout is greater than zero and we already have messages in the internal queue (filled by background thread) will it return immediately with whatever is already in the queue or it will use provided timeout to try gather more messages? I think I already know the answer but want to double check.

KrzysztofBranicki avatar Apr 08 '20 07:04 KrzysztofBranicki

First is the case when we would want to do also batch update on the database based on multiple messages rather than doing it message by message.

makes sense

Second is when we replicate topic from one Kafka cluster to second Kafka cluster in different AWS region. When replicating we would like to consume batch and produce batch as it seems to be most optimal performance wise.

both the producer and consumer batch behind the scenes (and this behavior is configurable) - i don't think you gain anything from doing this yourself as well.

Additional question for consumer.Consume(timeout). When timeout is greater than zero and we already have messages in the internal queue (filled by background thread) will it return immediately with whatever is already in the queue or it will use provided timeout to try gather more messages?

It will return immediately. fetching of messages from the broker happens in background threads independently of calls to the consume method.

mhowlett avatar Apr 08 '20 15:04 mhowlett

In this replication use-case we need to guarantee at least once delivery and unchanged ordering. We produce with Acks.All (min insync replicas 2), MaxInFlight 1 with high MessageTimeoutMs and MessageSendMaxRetries. We essentially can't produce next message until current one is confirmed to be committed by brocker. When treating it more like batches we could potentially at least parallelize that per partition as no one is guaranteeing ordering between partitions.

KrzysztofBranicki avatar Apr 10 '20 14:04 KrzysztofBranicki

It would useful to implement the parallel consumer in .net as described here: https://www.confluent.io/blog/kafka-consumer-multi-threaded-messaging/:

For the moment we could get around with the workaround described by Krzystof.

Abuntxa avatar Jan 28 '22 14:01 Abuntxa

fa

@mhowlett Any plans for adding ConsumeBatch method to IConsumer? If not then can you validate implementation provided below? It it based on the assumption that consumer.Consume(TimeSpan.Zero) will not call the broker instead only check if there is something on internal queue (which does not involve any IO bound operation) and return message from internal queue or null immediately. Is that assumption correct and if yes can it change it future resulting in breaking this code?

internal static class ConsumerExtensions
{
    public static IReadOnlyCollection<ConsumeResult<TKey, TValue>> ConsumeBatch<TKey, TValue>(this IConsumer<TKey, TValue> consumer, TimeSpan consumeTimeout, int maxBatchSize)
    {
        var message = consumer.Consume(consumeTimeout);

        if (message?.Message is null)
            return Array.Empty<ConsumeResult<TKey, TValue>>();

        var messageBatch = new List<ConsumeResult<TKey, TValue>> { message };

        while (messageBatch.Count < maxBatchSize)
        {
            message = consumer.Consume(TimeSpan.Zero);
            if (message?.Message is null)
                break;

            messageBatch.Add(message);
        }

        return messageBatch;
    }
}

I was wondering if this solution still works?

I tried using it but specifying a timeout zero stops the consumer from detecting any new messages

allprogrammers avatar May 18 '22 11:05 allprogrammers

Looks fine, though you could do it without the first call to Consume. There is nothing about this code that will stop the consumer pulling messages from Kafka. It will return immediately if the consumer hasn't consumed any messages yet.

mhowlett avatar May 18 '22 12:05 mhowlett

@mhowlett thank you for the reply. Can you please have a look here and see if you can help?

https://stackoverflow.com/questions/72288344/confluent-batch-consumer-consumer-not-working-if-time-out-is-specified

allprogrammers avatar May 18 '22 13:05 allprogrammers

Hi,

I have used the same approach (consume with TimeSpan.Zero) to consume batches (up to no more messages or max-batch-size) and everything worked fine.

I needed to consume in batches in order to create wrapper of the client that would offer parallel consumption capabilities while still providing key order and at least once warranties like described in confluent.

Nevertheless, the pattern of doing a single consumption with TimeOut followed by an iterative consumption with TimeSpan.Zero generated a weird behaviour. It always waited the whole first timeout instead of returning the control as soon there was any available message.

We had to change the configuration in order to always consume with zero timeout (also the first). Then we controlled the loop process setting an external configuration that would allow controlling a maximum poll loop throlling (if it would enter again the poll cycle before the specified throlling time it would wait until fullfilled that throlling time). This way we avoided to max-out the cpu with unncecesary poll cycles when there is nothing left to process.

Abuntxa avatar May 18 '22 14:05 Abuntxa

@Abuntxa can you please give some code examples? Also I don't understand what you mean by "throlling"

allprogrammers avatar May 19 '22 07:05 allprogrammers

@mhowlett, Interesting that background thread keep pulling the messages irrespective of the consume method call.

BTW could you pour some Insights on below questions,

  1. What is the first trigger to buffer the message to the local queue?

  2. Is there any way can control the background thread to stop fetching it?

3.Is there any pointer to know the consumer application how much message is available in the local queue?

  1. Is pause method purge the local queue/buffer whatever collected by background thread? (Say read 100 messages read from consume method and pausing the consumer. Since background thread keep pulling the message from broker and buffer it to local queue (say1000 message in local queue). If I pause the consumer 900 message from local queue is purged. while resuming, will it start from 51 message. Considering AutoCommit and AutoOffsetStore is false).

  2. Pausing the consumer will it stop the background thread to collect message from broker)

ksdvishnukumar avatar Sep 30 '23 19:09 ksdvishnukumar