aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

A max_records_per_partition parameter for Consumer.getmany

Open aure-olli opened this issue 6 years ago • 3 comments

Consumer.getmany already has a max_records parameter to to limit the number of messages returned, but nothing to limit the number of records by partition. I've implemented a max_records_per_partition to provide such limit: https://github.com/aio-libs/aiokafka/compare/master...aure-olli:max_records_per_partition

My use case is for my topic merger: I have to keep one message per partition in order to compare them, and multiple calls to Consumer.getone means many useless if partitions and tp not in partitions: test (https://github.com/aure-olli/aiokafka/blob/master/test_merge.py#L312:L322).

The usage of max_records_per_partition as a dict (with one limit for each partition) could probably cover more cases.

aure-olli avatar Jul 02 '19 02:07 aure-olli

Would calling getmany() N times with different max_records params not yield a similar result? I feel like it's quite a minor optimization. The problem with max_records_per_partition is a not really clear condition if we want to block or not if we don't have enough records. In max_records it's at least simple enough to work out the rules.

tvoinarovskyi avatar Jul 06 '19 17:07 tvoinarovskyi

Ok, it feels like it's not too bad from the change set. Alternative would be to use the same "max_records", but add dict-like support. A PR would be great)

tvoinarovskyi avatar Jul 06 '19 17:07 tvoinarovskyi

For your first question, my understanding of getmany is that it will return as soon as there are messages fitting the conditions, no matter if max_records is reached or not. So it should have the same behavior with a potential max_records_per_partition parameter.

I also like the idea of just using max_records, but what if you want both constraints ? A limit per partition, and a global limit ? And here is another question, shouldn't passing {partition: limit} dict be an alternative way of passing the list of allowed arguments ? While we're at it, shouldn't be allowed to pass a list or a set for partitions ?

So I guess we need to agree on which call are allowed and what do they do.

# should only get from tp1 ?
consumer.getmany(tp1, max_records=5, max_records_per_partition={tp1: 3, tp2: None})
# should also get from tp3 without any limit ?
consumer.getmany(tp1, tp2, tp3, max_records=5, max_records_per_partition={tp1: 3, tp2: None})
# should only get from tp1 and tp2 ? Or every other partition without any limit ?
consumer.getmany(max_records=5, max_records_per_partition={tp1: 3, tp2: None})
# more clear and simple, but creates another syntax
consumer.getmany({tp1: 3, tp2: None}, max_records=5)
# probably another allowed syntax if the previous one is, but is it too many syntax ?
consumer.getmany([tp1, tp2], max_records=5)
# should be applied to every similar methods ?
consumer.seek_to_end([tp1, tp2])

I think having consumer.getmany(max_records=5, max_records_per_partition={tp1: 3, tp2: None}) fetching from every partition is the most simple one to implement without changing the syntax.

But consumer.getmany({tp1: 3, tp2: None}, max_records=5) is also the most intuitive.

aure-olli avatar Jul 08 '19 02:07 aure-olli