confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Close producer's connection

Open almaz1213 opened this issue 3 years ago • 8 comments

How can I close producer's broker connection manually? It seems to leave zombie processes in cases of asyncio loop in threads.

Thanks in advance

almaz1213 avatar May 23 '22 09:05 almaz1213

you can't - the connections are managed entirely by librdkafka until destroyed, and the only place that happens is in the producer destructor.

I wonder if there is there a reason to support usage with the with statement?

In typical scenarios, I'd expect reference counting would be enough to clean up the producer deterministically, but maybe there are legit scenarios where the producer instance is referenced in a data structure with cycles (don't know off the top of my head, maybe you have one), in which case it wouldn't.

my sense is we should probably implement this. i.e.

  def __enter__(self)
  def __exit__(self, exc_type, exc_value, traceback)

thoughts?

mhowlett avatar May 23 '22 14:05 mhowlett

Thanks @mhowlett

in addition I would recommend confluent C lib developers to:

  1. use one connection for consumer, producer and admin. why do we need establish many socket connection from only node/app/service
  2. in consumer group.id must be related on topic, not on consumer initialisation. because consumption from topic related on partitions of the topic. so, it should look smth like consumer.subscribe(topic, groupId), consumer.unsubscribe(topic). otherwise (now) we must create new consumer instance for each topic with different groups that keeps new socket connection - not good architecture

almaz1213 avatar May 23 '22 14:05 almaz1213

would recommend confluent C lib developers to...

The Java API also opens multiple connections for different protocols, so wouldn't call it "not good architecture". Each client between admin, consumer, and producer has different TCP socket and buffer settings that can be tuned independently.

group.id must be related on topic, not on consumer initialisation. because consumption from topic related on partitions of the topic. so, it should look smth like consumer.subscribe(topic, groupId

Please clarify. You don't need unique groups or consumers per topic. One consumer instance (and group defined in the config) is be able to subscribe to multiple topics already.

OneCricketeer avatar Jun 01 '22 05:06 OneCricketeer

@OneCricketeer actually and usually apps consumes from one topic as group member and from other topic as unique member (with unique groupID). so groupID must be related to topic, not to whole connection. otherwise we need create new consumer object for each topic with its groupID. it creates a lot threads. that's not good

almaz1213 avatar Jun 01 '22 05:06 almaz1213

But you can have multiple topics in a group. What is considered "usual" is heavily dependent on the use case.

OneCricketeer avatar Jun 01 '22 05:06 OneCricketeer

can but if need it. but i need that i explained

almaz1213 avatar Jun 01 '22 05:06 almaz1213

We can argue consumer semantics, but I'm curious if you've experienced the same issues with aiokafka library instead?

OneCricketeer avatar Jun 01 '22 05:06 OneCricketeer

i consider other libs also. the confluence solution with acl (librdkafka) looks nice and fast but the issue makes it worse - spends more resources and time for open new thread for a topic consumption

almaz1213 avatar Jun 01 '22 06:06 almaz1213

use one connection for consumer, producer and admin. why do we need establish many socket connection from only node/app/service

for performance reasons, the client talks directly to the brokers it needs to. this is how the kafka protocol works.

in consumer group.id must be related on topic, not on consumer initialisation

the use case for consumer groups is very high throughput scenarios where many consumer instances are required to handle all the messages in a topic. the api matches this - in such scenarios, you will only want one group per consumer. your suggestion would be an alternate api, though the one used by this client is normal across other clients.

mhowlett avatar Oct 24 '22 21:10 mhowlett