pykafka
pykafka copied to clipboard
Each new consumer creation takes more time, than previous
I'm trying to create several consumers with the same consumer_id to allow several clients to read from same Kafka topic. But for some reasons, each new connections establishment takes more and more time. This happens with pykafka 2.7.0 You may see the code snippet to demonstrate the issue and console output.
Please, suggest. Thanks!
(py36) vserhei$ cat time_connection_test.py
import datetime
import pykafka
KAFKA_SERVER = "192.168.233.3:9092"
client = pykafka.KafkaClient(hosts=KAFKA_SERVER)
counsumer_list = []
for i in range(20):
start_time = datetime.datetime.now()
kafka_topic = client.topics[b'test_topic_1']
consumer = kafka_topic.get_simple_consumer(consumer_group=b'test_cg')
counsumer_list.append(consumer)
end_time = datetime.datetime.now() - start_time
print(i, end_time)
(py36) vserhei$
(py36) vserhei$ python time_connection_test.py
0 0:00:00.878086
1 0:00:00.527819
2 0:00:00.624658
3 0:00:00.898877
4 0:00:01.174601
5 0:00:01.447009
6 0:00:01.745823
7 0:00:02.002713
8 0:00:02.270587
9 0:00:02.540595
10 0:00:02.815236
11 0:00:03.134629
12 0:00:03.373099
13 0:00:03.649564
14 0:00:03.915460
15 0:00:04.223236
16 0:00:04.470146
17 0:00:04.769616
18 0:00:05.008634
19 0:00:05.364740
(py36) vserhei$
@emmett9001 quick profiling showed, that pykafka stucks in get_group_coordinator() method in the Cluster instance
Thanks @vikt0rs. I'll have to look into this more deeply, but it looks like something we should fix.
Maybe this will be helpful - this issue doesn't exist, if the user creates a consumer with use_rdkafka=True option.
@vikt0rs Though pykafka should suppoort arbitrary numbers of consumers per thread, I can't think of a situation in which consuming the same topic with more than one consumer in a single thread would be desirable over simply consuming the topic once and distributing the results to multiple downstream consumers of the messages. Is there a reason you're making so many consumers in the same thread?
Well, the reason is a quite simple - to simplify code and get rid of multiple downstream consumers. ]
Sure, this example is a synthetical one, but it illustrates the issue. In my case, there is a tornado-based web application, which sends messages from Kafka to the user via web-socket, so the new consumer creates for each new user connection.
If this approach is wrong and the current situation is not a bug, please suggest - what is the proper pattern for this case?
Thanks!
@vikt0rs Like I said above, each successive consumer instantiation becoming slower is definitely a bug. That said, for your use case I'd try to read from Kafka with a single thread not directly tied to any particular user and write those messages to shared memory. I'd then have the user-specific logic read from that shared memory instead of directly from Kafka.
I attempted to replicate this within Parse.ly's internal network and was unable to do so. Consumer instantiation times remain constant up to 100+ consumers in the same process.
@vikt0rs Do you have more information available from the profiling test you ran?
Thanks for working on this. Please inform, if you are use the same snippet for your tests or have you modified it? Will check for profiling data on my work PC, when I'll back from vacation
My test snippet is identical to the one posted above with the exception of KAFKA_SERVER and test_topic_, which I've changed as needed for my Kafka cluster and topic.