confluent-kafka-dotnet icon indicating copy to clipboard operation
confluent-kafka-dotnet copied to clipboard

How to restrict the number of messages while consuming

Open ksdvishnukumar opened this issue 10 months ago • 1 comments

Description

I have a application which is built on DotNetCore 6.0. We have a wrapper around the Confluent Kafka Package (2.1.1). We have logic to consumer the message as a batch where the logic accumulates the messages (say 25 messages per batch) once reached the batch size and pause the consumer and process the batch of messages and send the signal to commit (manual commit). Before doing it Resume the batch the perform the commit.

As I understand, When Consume method from Confluent Kafka IConsumer is called where polling will start. The fetcher send the request to the broker to fetch the messages and put in to the internal queue or buffer then message will be delivered to the client. When pausing the consumer, internal buffer / queue gets cleared. So this leads to re reading the same message.

I see that couple of configuration to restrict by # of bytes or KB.

  1. QueuedMaxMessagesKbytes As per my understanding, this is the internal queue buffer size for the fetched messages.
  2. FetchMaxBytes As per my understanding, this is the limit for combining all the partitions for the single consumer connection
  3. MaxPartitionFetchBytes As per my understanding, this is the limit for the partition level.

The consumer topic has 1 partition and it has 1000 messages each is having 500 KB. Even though, we set the limit, Say QueuedMaxMessagesKbytes to 5 MB, FetchMaxBytes to 10 MB and MaxPartitionFetchBytes to 5 MB, every time I received the 25 messages means (25* 500KB = 12.5 MB), But am limiting with 5 MB for the QueuedMaxMessagesKbytes and MaxPartitionFetchBytes. So it means that i should be having only 10 messages in a queue. More over am pausing the consumer as well.

One more interesting point is, I read in other GitHub thread passing the Timeout value in the First call to the Consume method will pull message from the broker later TImeSpan.Zero will return messages immediately from the internal queue. With this approach I could not able to commit when AutoCommit is disabled. This works only when AutoCommitEnabled is set to true. So i prefer to use the token based consume method along with CancellationDelayMaxMs. I see that, the same property is assigned internally for the Consume with Timeout value.

THIS IS NOT AN ISSUE BUT WANTED TO UNDERSTAND THE BEHAVIOR

Could you please clarify is my understanding right or how to achieve this behavior.

How to reproduce

Any simple code able to reproduce

Checklist

Please provide the following information:

  • [x] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file. - Not Needed
  • [x] Confluent.Kafka nuget version. - 2.1.1
  • [x] Apache Kafka version. - Azure Eventhub
  • [x] Client configuration. GroupId:"test,AutoOffsetStore=true,EnableAutoCommit=false,QueuedMaxMessagesKbytes=5 MB,FetchMaxBytes=10 MB,MaxPartitionFetchBytes=5MB
  • [x] Operating system. - Ubuntu in AKS
  • [x] Provide logs (with "debug" : "..." as necessary in configuration).
  • [x] Provide broker log excerpts.
  • [x] Critical issue.

ksdvishnukumar avatar Apr 03 '24 07:04 ksdvishnukumar

Hi @mhowlett , @edenhill

Is there any workaround on this can be implemented based on the Consumer configuration?

ksdvishnukumar avatar Apr 11 '24 05:04 ksdvishnukumar