aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

Document how to handle long message processing with max_poll_interval_ms

Open butla opened this issue 3 years ago • 3 comments

Describe the solution you'd like I think it'd be good to give a clear warning in the docs about processing long messages and focus people's attention on max_poll_interval_ms. Maybe another section - "long processing" - in here would be good.

The problem that I had

  • A message was taking a long time to process.
  • After 5 minutes, the consumer doing the processing left the group.
  • The group rebalanced
  • Another consumer got the same message to process.
  • The original consumer finished processing (after 7 minutes), tried to commit the offset, failed with UnknownMemberID, which also causes a rebalance of the whole group, preventing the second consumer from commiting the message.
  • That looped forever.

My understanding of max_poll_interval_ms The number isn't sent to Kafka. It's only used internally in aiokafka to leave the consumer group if the consumer didn't poll for messages in a while, suggesting it's stuck.

Is that correct?

butla avatar Jul 21 '22 14:07 butla

I have the same issue.

My understanding of max_poll_interval_ms The number isn't sent to Kafka. It's only used internally in aiokafka to leave the consumer group if the consumer didn't poll for messages in a while, suggesting it's stuck.

Is that correct?

If I'm reading KIP-62 correctly, then your interpretation should be correct:

We propose to introduce a separate locally enforced timeout for record processing and a background thread to keep the session active until this timeout expires. We call this new timeout as the "process timeout" and expose it in the consumer's configuration as max.poll.interval.ms. This config sets the maximum delay between client calls to poll(). When the timeout expires, the consumer will stop sending heartbeats and send an explicit LeaveGroup request. As soon as the consumer resumes processing with another call to poll(), the consumer will rejoin the group.

So for clients that expect to take a long time processing a single message, max_poll_interval_ms should be set higher than (at least) the longest time required for any single message. However, does it need to be set any higher? For example, if I async-iterate the consumer, does the consumer poll every time a single message is retrieved? Or does the consumer prefetch multiple messages, and the timeout triggers unless I process all messages in the prefetched batch under max_poll_interval_ms?

twisteroidambassador avatar Sep 16 '22 05:09 twisteroidambassador