aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

Warn if requested topic partitions has no overlap with assignment

Open ian-sentropy opened this issue 5 years ago • 1 comments

Describe the solution you'd like When using AIOKafkaConsumer.getone() with a list of topic partitions, it may be nice to log a warning if there is no overlap between the requested topic partitions and the assignment. A simple example where the error is obvious -

from aiokafka import TopicPartition
consumer = AIOKafkaConsumer("foo")
await consumer.start()

try:
    await consumer.getone(TopicPartition("bar", 0))  # This will hang forever with no warning 
finally:
    await consumer.stop()

A less obvious scenario is if we have multiple consumers of the same consumer group requesting partitions programmatically, say based on lag, similar to the consumption flow control example. Using getone(partitions) can cause consumers to hang in the event of a group rebalance if the set of requested partitions are reassigned away from the consumer.

A concrete example, say we have topic foo with 9 partitions and 3 consumers (A, B, and C). Consumer A has partitions 0-2, consumer B has partitions 3-5, and consumer C has partitions 6-8.

  1. Consumer A makes a getone call for partition 0 based on offset lag
  2. Before a message is fetched, consumer C dies, triggering a rebalance
  3. In the rebalance, partition 0 is reassigned from consumer A to consumer B.
  4. Consumer A's call to getone will hang forever because partition 0 is no longer part of its assignment

ian-sentropy avatar Oct 14 '20 03:10 ian-sentropy

I'm unsure of a good way to implement the above functionality with getone without the risk of hanging. I'm thinking of using getmany with max_records=1, using the built-in timeout to exit the getmany call, allowing for another call with an updated list of partitions post rebalance.

ian-sentropy avatar Oct 14 '20 03:10 ian-sentropy