streamz icon indicating copy to clipboard operation
streamz copied to clipboard

Profiling streamz pipeline with from_kafka_batched

Open skmatti opened this issue 6 years ago • 2 comments

I am trying to profile the below simple streamz pipeline with from_kafka_batched method,

from streamz import Stream
from time import time, sleep
from tornado import gen

def increment(x):
   return [int(i) + 1 for i in x] 

async def test_from_kafka_async():
    # kafka props
    topic = "my-topic"
    brokers = 'my-broker:9092'
    consumer_conf = {'bootstrap.servers': brokers,'group.id': 'new_group', 'session.timeout.ms': 6000}

    stream = Stream.from_kafka_batched(topic, consumer_conf, poll_interval='5s',npartitions=1, asynchronous = True, dask= False) # returns list of kafka values
    stream.start()

    stream.map(increment).sink(print)
    await gen.sleep(5)

from tornado.ioloop import IOLoop
import cProfile

cProfile.run('IOLoop().run_sync(test_from_kafka_async)', sort='time')

cProfile fails to profile increment method(or all the downstream functions when I tried to add multiple functions). Also the CPU time listed for from_kafka_batched method seems to be incorrect.

Part of cProfile stats:

118653 function calls (114743 primitive calls) in 5.239 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        6    5.005    0.834    5.005    0.834 {method 'poll' of 'select.epoll' objects}
      239    0.050    0.000    0.050    0.000 {built-in method marshal.loads}
  689/682    0.013    0.000    0.044    0.000 {built-in method builtins.__build_class__}
      865    0.011    0.000    0.011    0.000 {method 'sub' of '_sre.SRE_Pattern' objects}
    21/19    0.010    0.000    0.021    0.001 {built-in method _imp.create_dynamic}
      623    0.010    0.000    0.010    0.000 {method 'findall' of '_sre.SRE_Pattern' objects}
     1253    0.008    0.000    0.008    0.000 {built-in method posix.stat}
      582    0.006    0.000    0.024    0.000 <frozen importlib._bootstrap_external>:1247(find_spec)
    287/1    0.006    0.000    5.239    5.239 {built-in method builtins.exec}
      239    0.004    0.000    0.008    0.000 <frozen importlib._bootstrap_external>:848(get_data)
        2    0.000    0.000    0.232    0.116 streamz_from_kafka.py:15(test_from_kafka_async)
        1    0.000    0.000    0.232    0.232 sources.py:288(from_kafka_batched)

Replacing from_kafka_batched with emit solves the problem.

Anyone knows the reason behind this?

skmatti avatar Jan 25 '19 17:01 skmatti

Replacing from_kafka_batched with emit solves the problem.

I'm not sure what you mean by this

Why the gen.sleep(5) ?

Also the CPU time listed for from_kafka_batched method seems to be incorrect

How so?

mrocklin avatar Jan 28 '19 17:01 mrocklin

I'm not sure what you mean by this

Creating an empty stream and feeding list of values with emit. This streamz pipeline is exactly same as the one above except the messages are pushed in separately with emit.

Why the gen.sleep(5) ?

With a few trials, I figured out that the above pipeline takes about 2-3 secs to read messages from kafka and finish processing. Decreasing the wait time resulted in partial output(The above code should print 10 lines as the kafka topic has 10 partitions) or no output.

How so?

From cProfile stats, the from_kafka_batched method takes only 0.232 secs of 2-3 secs thats required for processing the stream. Clearly that seems suspicious with the only down stream function taking in sub milliseconds for processing.

skmatti avatar Jan 28 '19 19:01 skmatti