pykafka
pykafka copied to clipboard
batching parameter on consume()
Reason is :
Need to limit database connection access by grouping key access together.
Hence, the idea of processing message by mini-batch.
msg_list = consume(10msg,...)
We can do ourself as a loop, but wondering if this can be part of pykafka.
Thanks for the suggestion, @arita37. A count parameter on consume is certainly something we could add if enough users thought it would be useful. It's worth noting that it would only control how many messages were returned from the consume call and would have no effect on the batching logic that the consumer uses to fetch from kafka (which is controlled by kwargs on the consumer like fetch_message_max_bytes and num_consumer_fetchers, among others).
Your mention of needing to "limit database connection access" does give me some pause, though. Are you imagining an argument that would perform some kind of grouping based on message content? If so, that's better handled by producing with a partition key. It's also straightforward enough to build the list in client code, like so:
messages = [consumer.consume() for _ in xrange(10)]
- Yes, that is the core of the question. grouping message based on contents. And post process the grouping messages.
Usually, latency come from post-processing the messages (database access).
Pykafka aims to remain completely ignorant of the contents of the messages it processes, so I don't think mechanisms allowing grouping based on message content will ever be added. That said, Kafka itself provides partition keying as a way to consume messages in meaningful subgroups within a topic. You can pass an instance of the HashingPartitioner to the producer's partitioner kwarg. This will enable you to use the partition_key kwarg on Producer.produce to group messages logically (perhaps by some piece of their contents) into partitions on your topic. When consuming, you can use the partitions argument on SimpleConsumer to specify the exact partitions the consumer should listen to. This will allow each consumer to consume only messages from the same logical grouping.
I would also recommend examining your topic setup and considering using more topics to achieve your logical grouping. If every message in your topic is part of the same logical group, you'll be able to take advantage of pykafka's automatic consumption balancing via the BalancedConsumer class while maintaining your application's particular logical grouping of messages.