pykafka
pykafka copied to clipboard
Allow for producing to topics dynamically.
Currently, a new producer needs to be instantiated for each topic that you want to produce to. I am working on a service that intakes messages from many producers, and then sends them on to different Kafka topics based on the content of the message. Instantiating a new producer for each topic is a lot of overhead. It would be handy if there was a way to produce to a topic by specifying topic as a parameter to a produce method.
This would be extra efficient, not just because there would be fewer producer instances and threads around, but because you could combine and send message sets for multiple topics in the same request.
It's not the smallest change, but I can see how we could upgrade the current Producer
to become a MultiTopicProducer
(horrible name I know), and then provide a new Producer
providing the current interface, which would just be a thin class around a MultiTopicProducer
.
+1
The same applies to consumers, noted in #395
Is there any plans on this, I would find this useful as well.
Any update on this? I saw it is postponed multiple times.
It's not a very high priority at the moment. It's a really big change for not that big of a benefit from my perspective, though I gather from the activity on this thread that a lot of folks feel differently.
In our use case we are trying to produce to/consume from thousands topics at the same time. Without this feature, it would have big performance impact on our system.
Is there a way to prioritize this feature?
Just notice that Kafka consumer in 0.9.0
Each Kafka consumer is able to configure a consumer group that it belongs to, and can dynamically set the list of topics it wants to subscribe to through subscribe(List, ConsumerRebalanceListener), or subscribe to all topics matching certain pattern through subscribe(Pattern, ConsumerRebalanceListener).
Wonder how we can do that with pykafka?
@jianbin-wei What's the source of that quote? I'd like to know if this is a recommended practice, something an existing client already does, or something else.
@emmett9001 It's on the java consumer doc under "Consumer Groups and Topic Subscriptions"
Yes, last I checked kafka-python's API is similar. This is a reasonable case for supporting it in pykafka, though it's far from a trivial change.
Any opinions on how the api should look if this gets implemented?
The basic issue is that the only way pykafka currently has to get a consumer or producer instance is to call a method on a Topic
instance. Supporting multi-topic consumers and producers would require adding a method to KafkaClient
like get_consumer(topics)
that accepts a sequence of Topic
instances. Beyond that, SimpleConsumer
would have to hold a sequence of Topic
s rather than a single one, and consume()
might have to incorporate a topics
parameter to control which topic is being consumed from. This in turn would require the consumer to store its queued messages with some way to retrieve them on a per-topic basis. The way the consumer holds its partitions would also have to be altered. I admit that this sounds like only slightly less problematic of a change than I'd anticipated. It's the kind of thing that I'd be able to work on over the course of a few Fridays. Of course, if anyone on this thread would like to take a crack at it, I'm very open to a substantive pull request addressing this issue.
@emmett9001 one thing to note from your comment is that this subscribe can be dynamic. It means a pattern. Say it subscribe "topic-test*" and all existing and future topics starts with "topic-test" would be consumed by this instance. A watch on topics would do.
Right now I wrap around BalancedConsumer for this purpose but it is not efficient.
It would be great if this feature is implemented.
Following are my thoughts after quick reading of java client and my experience on writing a wrapper to implement the idea of dynamically producing/consuming from topics. Here I only comment on consuming side. @emmett9001 @yungchin @ottomata please let me know your thoughts.
Supporting multi-topic consumers and producers would require adding a method to KafkaClient like get_consumer(topics) that accepts a sequence of Topic instances.
I am thinking about passing a list of topic names (not Topic instances) or a name pattern. A method to dynamically change the consuming topics later would be handy. No need to call get_consumer
again and again.
Some change on BalancedConsumer
may work.
Beyond that, SimpleConsumer would have to hold a sequence of Topics rather than a single one,
The interface of SimpleConsumer()
would be a sequence of topic/partition. Consuming from multiple topics also requires balancing between all available partitions to get partition-level scalability.
and consume() might have to incorporate a topics parameter to control which topic is being consumed from.
IMO, it would be fine without topic parameter. The user should expect a mix of messages from all consuming topics. Each returned message should have meta data about the topic/partition (maybe some other information too) so users can do message handling accordingly.
This in turn would require the consumer to store its queued messages with some way to retrieve them on a per-topic basis.
With above idea, therefore, no need to hold internal queues for each topic/partition. Only offsets of consumed messages need to be held for each topic/partition.
In our use case, we saw large memory usage as It holds messages from each topic/partition it consumes from until the max queued message number. By default it is 2000 and each message in our case is 10KB. We have about 1000 topic/partitions and it has 20G memory print.
The way the consumer holds its partitions would also have to be altered.
If messages are still stored as per-partition, it would not need to be changed. If it is to be stored in one single queue, then yes.
See https://github.com/Parsely/pykafka/issues/786 requesting regex-based topic subscription, which is blocked by this ticket.
confluent kafka-connect and sink connectors are best solution to this problem