aiokafka
aiokafka copied to clipboard
Warn if requested topic partitions has no overlap with assignment
Describe the solution you'd like
When using AIOKafkaConsumer.getone() with a list of topic partitions, it may be nice to log a warning if there is no overlap between the requested topic partitions and the assignment. A simple example where the error is obvious -
from aiokafka import TopicPartition
consumer = AIOKafkaConsumer("foo")
await consumer.start()
try:
await consumer.getone(TopicPartition("bar", 0)) # This will hang forever with no warning
finally:
await consumer.stop()
A less obvious scenario is if we have multiple consumers of the same consumer group requesting partitions programmatically, say based on lag, similar to the consumption flow control example. Using getone(partitions) can cause consumers to hang in the event of a group rebalance if the set of requested partitions are reassigned away from the consumer.
A concrete example, say we have topic foo with 9 partitions and 3 consumers (A, B, and C). Consumer A has partitions 0-2, consumer B has partitions 3-5, and consumer C has partitions 6-8.
- Consumer A makes a
getonecall for partition 0 based on offset lag - Before a message is fetched, consumer C dies, triggering a rebalance
- In the rebalance, partition 0 is reassigned from consumer A to consumer B.
- Consumer A's call to
getonewill hang forever because partition 0 is no longer part of its assignment
I'm unsure of a good way to implement the above functionality with getone without the risk of hanging. I'm thinking of using getmany with max_records=1, using the built-in timeout to exit the getmany call, allowing for another call with an updated list of partitions post rebalance.