streamz
streamz copied to clipboard
to_kafka throughput
I'm testing to_kafka
sink and its throughput is limited by polltime (0.2 sec). Looks like self.producer.poll(0)
only polls for one message at a time and so only one callback is called every 0.2 seconds.
This fails:
def test_to_kafka_throughput():
ARGS = {'bootstrap.servers': 'localhost:9092'}
with kafka_service() as kafka:
_, TOPIC = kafka
source = Stream.from_iterable(range(100)).map(lambda x: str(x).encode())
kafka = source.to_kafka(TOPIC, ARGS)
out = kafka.sink_to_list()
source.start()
wait_for(
lambda: len(out) == 100,
5,
period=0.1,
fail_func=lambda: print("len(out) ==", len(out))
)
The existing test_to_kafka
test doesn't catch this, because it starts waiting on the result only after all the items are emitted.
I spent some time tinkering with the code, but can't figure out what's wrong and how to fix this, so any ideas are appreciated.
cc @chinmaychandak
Yes, you're correct. Unlike the from_kafka_batched
source that makes it possible to read large batches of data from Kafka really fast over Dask, the to_kafka
sink is really slow (it also does not work with Dask, users need to gather
first). One of things we were looking into was #279, but somehow that got pushed back.
Currently, we are using our own custom write_to_kafka
function inside map
which then can be used over Dask as well, and speeds things up by many orders of magnitude. This seems to be working really well in prod with high-speed data pipelines.
Example:
def write_to_kafka(x):
producer = ck.Producer(producer_conf)
for message-json in x:
producer.produce(kafka-topic, message-json)
producer.flush() # If there's a really large number of messages, flushing every 100k messages or so, makes things faster on the consumer side.
return x # recommended only for dev or monitoring purposes
output = stream.from_kafka_batched(..., dask=True).map(aggregations).map(write_to_kafka).gather().sink_to_list()
Obviously you will need to implement your own back-pressure logic if need be in the custom writing function.
We need to spend some time on the default Kafka sink for sure, it's been too long since I've taken a look at it.
@chinmaychandak , is it a true statement that your batched approach offers much higher throughput, but the latency is similar (e.g., if you to have a batch with a single message in it)?
@roveo , it seems that calling .flush()
rather than .poll()
would solve your failure.
@chinmaychandak , is it a true statement that your batched approach offers much higher throughput, but the latency is similar (e.g., if you to have a batch with a single message in it)?
Yes, batching messages into the producer's local buffer queue and flushing at reasonable intervals writing to Kafka is definitely much faster with higher throughput. Also, the speed-up is two-fold over Dask since each worker can writes its share of messages to Kafka in parallel.
it seems that calling .flush() rather than .poll() would solve your failure
But then, if I call this for each message, latency would be equal to network round-trip time to broker.
Yes, batching messages into the producer's local buffer queue and flushing at reasonable intervals writing to Kafka is definitely much faster with higher throughput.
I was under the impression that this functionality is built into the producer: you can just call .produce()
for each message, and it will buffer messages and send them when the buffer is full or at reasonable intervals. Something like partition
with timeout, but inside ck.Producer
itself. Am I wrong? (see confluent doc)
The same batching can be done with streamz, but I don't see why built-in buffering suddenly stops working inside a stream node.
Unlike the from_kafka_batched source that makes it possible to read large batches of data from Kafka really fast over Dask, the to_kafka sink is really slow
Yes, but I can't figure out why. This is not a performance issue: the latency per message is exactly 0.2 seconds. So the problem isn't that the client is slow or it's not distributed. If I do this in a for loop with .produce
, callbacks and everything, I get thousands of messages per second.
Am I wrong?
No, that's exactly what I'm doing in the example snippet, flush at reasonable intervals. Don't call flush for every message.
This is not a performance issue: the latency per message is exactly 0.2 seconds.
Yes, we need to change the to_kafka
sink to something faster which uses produce
and flush
rather than poll
but keeping the back-pressure logic intact.
But .poll
should just execute callbacks for new messages delivered to the broker, which should still be delivered even without force-flushing. Why is it slower than calling .flush
?
args = {"bootstrap.servers": "localhost:9092"}
pr = ck.Producer(args)
def poll():
while True:
pr.poll()
time.sleep(0.2)
global counter
counter = 0
def cb(err, msg):
global counter
counter += 1
th = Thread(target=poll)
th.start()
for i in range(100000):
pr.produce("test", str(i).encode(), callback=cb)
wait_for(lambda: counter == 100000, 0.5)
print("Success")
This is essentially the same thing as in to_kafka
, but it's lightning fast.
I think I get it.
The source emits the next item only after the future it awaits on is resolved. But it is resolved only once poll is called and the previous callback is executed, so items are emitted and delivered one by one after each .poll()
call, every 0.2 seconds.
So to_kafka.update()
shouldn't return a future, but should retain refs until the callback is called. If there's an error passed to callback, it should retry or raise.
The source emits the next item only after the future it awaits on is resolved. But it is resolved only once poll is called and the previous callback is executed, so items are emitted and delivered one by one after each .poll() call, every 0.2 seconds.
Yes, this sounds exactly right! to_kafka
also ensures that the order of the messages being delivered to Kafka is preserved.
to_kafka
also ensures that the order of the messages being delivered to Kafka is preserved
I think the client itself doesn't ensure that. It's stated in the docs that the order may change if some messages are retried.
Yes, this sounds exactly right!
So would you say that it works as intended?
So would you say that it works as intended?
Well, this is certainly an understandable mode, effectively we have a buffer of one. But I don't think the docstring describes this, so I would say its lacking in at least that. I don't see any reason that we can't have messages pile up in the producer buffer, since it will execute the callbacks for us. In the case that we don't want to reference count, this would be much faster, as @roveo points out, due to batching, but messages might be sent out of order.
There is not much in the documentation about message ordering, except this opaque statement, another gap that needs filling.