aiokafka
aiokafka copied to clipboard
A max_records_per_partition parameter for Consumer.getmany
Consumer.getmany already has a max_records parameter to to limit the number of messages returned, but nothing to limit the number of records by partition. I've implemented a max_records_per_partition to provide such limit: https://github.com/aio-libs/aiokafka/compare/master...aure-olli:max_records_per_partition
My use case is for my topic merger: I have to keep one message per partition in order to compare them, and multiple calls to Consumer.getone means many useless if partitions and tp not in partitions: test (https://github.com/aure-olli/aiokafka/blob/master/test_merge.py#L312:L322).
The usage of max_records_per_partition as a dict (with one limit for each partition) could probably cover more cases.
Would calling getmany() N times with different max_records params not yield a similar result? I feel like it's quite a minor optimization.
The problem with max_records_per_partition is a not really clear condition if we want to block or not if we don't have enough records. In max_records it's at least simple enough to work out the rules.
Ok, it feels like it's not too bad from the change set. Alternative would be to use the same "max_records", but add dict-like support. A PR would be great)
For your first question, my understanding of getmany is that it will return as soon as there are messages fitting the conditions, no matter if max_records is reached or not. So it should have the same behavior with a potential max_records_per_partition parameter.
I also like the idea of just using max_records, but what if you want both constraints ? A limit per partition, and a global limit ? And here is another question, shouldn't passing {partition: limit} dict be an alternative way of passing the list of allowed arguments ? While we're at it, shouldn't be allowed to pass a list or a set for partitions ?
So I guess we need to agree on which call are allowed and what do they do.
# should only get from tp1 ?
consumer.getmany(tp1, max_records=5, max_records_per_partition={tp1: 3, tp2: None})
# should also get from tp3 without any limit ?
consumer.getmany(tp1, tp2, tp3, max_records=5, max_records_per_partition={tp1: 3, tp2: None})
# should only get from tp1 and tp2 ? Or every other partition without any limit ?
consumer.getmany(max_records=5, max_records_per_partition={tp1: 3, tp2: None})
# more clear and simple, but creates another syntax
consumer.getmany({tp1: 3, tp2: None}, max_records=5)
# probably another allowed syntax if the previous one is, but is it too many syntax ?
consumer.getmany([tp1, tp2], max_records=5)
# should be applied to every similar methods ?
consumer.seek_to_end([tp1, tp2])
I think having consumer.getmany(max_records=5, max_records_per_partition={tp1: 3, tp2: None}) fetching from every partition is the most simple one to implement without changing the syntax.
But consumer.getmany({tp1: 3, tp2: None}, max_records=5) is also the most intuitive.