streamz
streamz copied to clipboard
Profiling streamz pipeline with from_kafka_batched
I am trying to profile the below simple streamz pipeline with from_kafka_batched method,
from streamz import Stream
from time import time, sleep
from tornado import gen
def increment(x):
return [int(i) + 1 for i in x]
async def test_from_kafka_async():
# kafka props
topic = "my-topic"
brokers = 'my-broker:9092'
consumer_conf = {'bootstrap.servers': brokers,'group.id': 'new_group', 'session.timeout.ms': 6000}
stream = Stream.from_kafka_batched(topic, consumer_conf, poll_interval='5s',npartitions=1, asynchronous = True, dask= False) # returns list of kafka values
stream.start()
stream.map(increment).sink(print)
await gen.sleep(5)
from tornado.ioloop import IOLoop
import cProfile
cProfile.run('IOLoop().run_sync(test_from_kafka_async)', sort='time')
cProfile fails to profile increment method(or all the downstream functions when I tried to add multiple functions). Also the CPU time listed for from_kafka_batched method seems to be incorrect.
Part of cProfile stats:
118653 function calls (114743 primitive calls) in 5.239 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
6 5.005 0.834 5.005 0.834 {method 'poll' of 'select.epoll' objects}
239 0.050 0.000 0.050 0.000 {built-in method marshal.loads}
689/682 0.013 0.000 0.044 0.000 {built-in method builtins.__build_class__}
865 0.011 0.000 0.011 0.000 {method 'sub' of '_sre.SRE_Pattern' objects}
21/19 0.010 0.000 0.021 0.001 {built-in method _imp.create_dynamic}
623 0.010 0.000 0.010 0.000 {method 'findall' of '_sre.SRE_Pattern' objects}
1253 0.008 0.000 0.008 0.000 {built-in method posix.stat}
582 0.006 0.000 0.024 0.000 <frozen importlib._bootstrap_external>:1247(find_spec)
287/1 0.006 0.000 5.239 5.239 {built-in method builtins.exec}
239 0.004 0.000 0.008 0.000 <frozen importlib._bootstrap_external>:848(get_data)
2 0.000 0.000 0.232 0.116 streamz_from_kafka.py:15(test_from_kafka_async)
1 0.000 0.000 0.232 0.232 sources.py:288(from_kafka_batched)
Replacing from_kafka_batched with emit solves the problem.
Anyone knows the reason behind this?
Replacing from_kafka_batched with emit solves the problem.
I'm not sure what you mean by this
Why the gen.sleep(5)
?
Also the CPU time listed for from_kafka_batched method seems to be incorrect
How so?
I'm not sure what you mean by this
Creating an empty stream and feeding list of values with emit. This streamz pipeline is exactly same as the one above except the messages are pushed in separately with emit.
Why the gen.sleep(5) ?
With a few trials, I figured out that the above pipeline takes about 2-3 secs to read messages from kafka and finish processing. Decreasing the wait time resulted in partial output(The above code should print 10 lines as the kafka topic has 10 partitions) or no output.
How so?
From cProfile stats, the from_kafka_batched method takes only 0.232 secs of 2-3 secs thats required for processing the stream. Clearly that seems suspicious with the only down stream function taking in sub milliseconds for processing.