streamz icon indicating copy to clipboard operation
streamz copied to clipboard

to_kafka throughput

Open roveo opened this issue 4 years ago • 13 comments

I'm testing to_kafka sink and its throughput is limited by polltime (0.2 sec). Looks like self.producer.poll(0) only polls for one message at a time and so only one callback is called every 0.2 seconds.

This fails:

def test_to_kafka_throughput():
    ARGS = {'bootstrap.servers': 'localhost:9092'}
    with kafka_service() as kafka:
        _, TOPIC = kafka
        source = Stream.from_iterable(range(100)).map(lambda x: str(x).encode())
        kafka = source.to_kafka(TOPIC, ARGS)
        out = kafka.sink_to_list()

        source.start()
        wait_for(
            lambda: len(out) == 100,
            5,
            period=0.1,
            fail_func=lambda: print("len(out) ==", len(out))
        )

The existing test_to_kafka test doesn't catch this, because it starts waiting on the result only after all the items are emitted.

I spent some time tinkering with the code, but can't figure out what's wrong and how to fix this, so any ideas are appreciated.

roveo avatar Dec 04 '20 16:12 roveo

cc @chinmaychandak

martindurant avatar Dec 04 '20 16:12 martindurant

Yes, you're correct. Unlike the from_kafka_batched source that makes it possible to read large batches of data from Kafka really fast over Dask, the to_kafka sink is really slow (it also does not work with Dask, users need to gather first). One of things we were looking into was #279, but somehow that got pushed back.

Currently, we are using our own custom write_to_kafka function inside map which then can be used over Dask as well, and speeds things up by many orders of magnitude. This seems to be working really well in prod with high-speed data pipelines.

Example:

def write_to_kafka(x):
   producer = ck.Producer(producer_conf)
   for message-json in x:
      producer.produce(kafka-topic, message-json) 
   producer.flush() # If there's a really large number of messages, flushing every 100k messages or so, makes things faster on the consumer side.
   return x # recommended only for dev or monitoring purposes

output = stream.from_kafka_batched(..., dask=True).map(aggregations).map(write_to_kafka).gather().sink_to_list()

Obviously you will need to implement your own back-pressure logic if need be in the custom writing function.

We need to spend some time on the default Kafka sink for sure, it's been too long since I've taken a look at it.

chinmaychandak avatar Dec 04 '20 19:12 chinmaychandak

@chinmaychandak , is it a true statement that your batched approach offers much higher throughput, but the latency is similar (e.g., if you to have a batch with a single message in it)?

martindurant avatar Dec 04 '20 19:12 martindurant

@roveo , it seems that calling .flush() rather than .poll() would solve your failure.

martindurant avatar Dec 04 '20 19:12 martindurant

@chinmaychandak , is it a true statement that your batched approach offers much higher throughput, but the latency is similar (e.g., if you to have a batch with a single message in it)?

Yes, batching messages into the producer's local buffer queue and flushing at reasonable intervals writing to Kafka is definitely much faster with higher throughput. Also, the speed-up is two-fold over Dask since each worker can writes its share of messages to Kafka in parallel.

chinmaychandak avatar Dec 04 '20 19:12 chinmaychandak

it seems that calling .flush() rather than .poll() would solve your failure

But then, if I call this for each message, latency would be equal to network round-trip time to broker.

Yes, batching messages into the producer's local buffer queue and flushing at reasonable intervals writing to Kafka is definitely much faster with higher throughput.

I was under the impression that this functionality is built into the producer: you can just call .produce() for each message, and it will buffer messages and send them when the buffer is full or at reasonable intervals. Something like partition with timeout, but inside ck.Producer itself. Am I wrong? (see confluent doc)

The same batching can be done with streamz, but I don't see why built-in buffering suddenly stops working inside a stream node.

Unlike the from_kafka_batched source that makes it possible to read large batches of data from Kafka really fast over Dask, the to_kafka sink is really slow

Yes, but I can't figure out why. This is not a performance issue: the latency per message is exactly 0.2 seconds. So the problem isn't that the client is slow or it's not distributed. If I do this in a for loop with .produce, callbacks and everything, I get thousands of messages per second.

roveo avatar Dec 04 '20 20:12 roveo

Am I wrong?

No, that's exactly what I'm doing in the example snippet, flush at reasonable intervals. Don't call flush for every message.

This is not a performance issue: the latency per message is exactly 0.2 seconds.

Yes, we need to change the to_kafka sink to something faster which uses produce and flush rather than poll but keeping the back-pressure logic intact.

chinmaychandak avatar Dec 04 '20 20:12 chinmaychandak

But .poll should just execute callbacks for new messages delivered to the broker, which should still be delivered even without force-flushing. Why is it slower than calling .flush?

args = {"bootstrap.servers": "localhost:9092"}
pr = ck.Producer(args)

def poll():
    while True:
        pr.poll()
        time.sleep(0.2)

global counter
counter = 0

def cb(err, msg):
    global counter
    counter += 1

th = Thread(target=poll)
th.start()

for i in range(100000):
    pr.produce("test", str(i).encode(), callback=cb)

wait_for(lambda: counter == 100000, 0.5)
print("Success")

This is essentially the same thing as in to_kafka, but it's lightning fast.

roveo avatar Dec 04 '20 20:12 roveo

I think I get it.

The source emits the next item only after the future it awaits on is resolved. But it is resolved only once poll is called and the previous callback is executed, so items are emitted and delivered one by one after each .poll() call, every 0.2 seconds.

roveo avatar Dec 04 '20 20:12 roveo

So to_kafka.update() shouldn't return a future, but should retain refs until the callback is called. If there's an error passed to callback, it should retry or raise.

roveo avatar Dec 04 '20 20:12 roveo

The source emits the next item only after the future it awaits on is resolved. But it is resolved only once poll is called and the previous callback is executed, so items are emitted and delivered one by one after each .poll() call, every 0.2 seconds.

Yes, this sounds exactly right! to_kafka also ensures that the order of the messages being delivered to Kafka is preserved.

chinmaychandak avatar Dec 05 '20 02:12 chinmaychandak

to_kafka also ensures that the order of the messages being delivered to Kafka is preserved

I think the client itself doesn't ensure that. It's stated in the docs that the order may change if some messages are retried.

Yes, this sounds exactly right!

So would you say that it works as intended?

roveo avatar Dec 05 '20 08:12 roveo

So would you say that it works as intended?

Well, this is certainly an understandable mode, effectively we have a buffer of one. But I don't think the docstring describes this, so I would say its lacking in at least that. I don't see any reason that we can't have messages pile up in the producer buffer, since it will execute the callbacks for us. In the case that we don't want to reference count, this would be much faster, as @roveo points out, due to batching, but messages might be sent out of order.

There is not much in the documentation about message ordering, except this opaque statement, another gap that needs filling.

martindurant avatar Dec 07 '20 14:12 martindurant