Thread leaks
Problem
phobos uses Thread.current[NAMESPACE] thread to save a reference to async_producer
Therefore, it causes threads(memory) leaks in a multithreaded environments (Sidekiq, puma).
code to reproduce
p Thread.list.size => 5
Thread.new {Deimos::Backends::KafkaAsync.execute(producer_class: Struct.new(:topic).new, messages: [])}.join
p Thread.list.size => 7
Thread.new {Deimos::Backends::KafkaAsync.execute(producer_class: Struct.new(:topic).new, messages: [])}.join
p Thread.list.size => 9
Suggestion
As ruby-kafka is multithread safe it makes sense to have one instance of async producer per process.
https://github.com/zendesk/ruby-kafka?tab=readme-ov-file#thread-safety
the asynchronous producer should be safe to use in a multi-threaded environment. This is because producers, when instantiated, get their own copy of any non-thread-safe data such as network sockets. Furthermore, the asynchronous producer has been designed in such a way to only a single background thread operates on this data while any foreground thread with a reference to the producer object can only send messages to that background thread over a safe queue. Therefore it is safe to share an async producer object between many threads.
This fixes the leak, reduces memory usage and potentially improves performance as messages might be sent in batches.
this can be achieved by updating create_async_producer and async_producer methods https://github.com/phobos/phobos/blob/master/lib/phobos/producer.rb#L141
as
def create_async_producer
client = kafka_client || configure_kafka_client(Phobos.create_kafka_client(:producer))
@async_producer = client.async_producer(**async_configs)
end
# @return [Kafka::AsyncProducer]
def async_producer
@async_producer
end
PRs welcome!
As ruby-kafka is multithread safe it makes sense to have one instance of async producer per process.
Just keep in mind that historically ruby-kafka producer had a lot of issues related to message buffer limits reached due to slow data dispatches. Reducing this to one producer may cause it producer way more messages with one instance which may cause crashes.
FYI exactly this was the reason why I rewrote 4 years ago waterdrop from ruby-kafka to librdkafka
Just keep in mind that historically ruby-kafka producer had a lot of issues related to message buffer limits reached due to slow data dispatches
thank you for sharing! I will do performance tests with one async producer versus as it is now. Will share the results here
Instead of trying to fix phobos async_producer I followed @mensfeld advice and tried waterdrop as deimos backend.
I wrote a simple proxy class for waterdrop api, created async deimos backend and overwritten deimos determine_backend_class
I dont see significant performance benefits so far. There is one factor which might affect rdkafka performance.
Deimos adds partition_key to a message.
If the partition_key is set rdkafka tries to map it to a partition number
therefore rdkafka needs to know the partition count from the kafka metadata
https://github.com/karafka/rdkafka-ruby/blob/v0.18.0/lib/rdkafka/producer.rb#L293
rdkafka has a read-through cache for the kafka metadata with 30 seconds TTL without any cache miss storm prevention.
Metadata fetching causes publish delays of up to 2 seconds (even in async publishing)
I am going to add async queue for waterdrop produce_many_async calls. Hopefully, that would make async publishing with rdkafka faster than with async ruby-kafka
@2rba you mind providing benchmarks? in regards to the cache, I have a plan to improve it in the upcoming months by aligning it with the librdkafka metadata refresh layer and re-using the librdkafka data cache. This would pretty much eliminate the recheck hickup. On top of that there's already this https://github.com/karafka/rdkafka-ruby/issues/402 and another card (that I can't find now) for an async, non-blocking refetch of all of this. FYI if you do not rescale stuff often you could easily increase this cache.
I benchmarked sync ruby-kafka vs sync rdkafka from local env via deimos.
It was something like
Benchmark.measure { Deimos::Producer.publish_list(9000.times.map { {a: 'a'*1000} }) }
- Deimos::Backends::Kafka ~43 seconds
- Deimos::Backends::RdkafkaSync ~ 10 seconds
rdkafka 4x times faster
Then I set up a production experiment for async send Half of production web pods use ruby-kafka, and the other half uses rdkafka rdkafka messages are sent via concurrent-ruby async thread (to avoid metadata fetch delays) The volume of messages: max 50 per/second, average 16 per/second For rdkafka
- Average CPU usage: slightly better 3%
- Average memory usage: no difference was detected
- Average ruby GC time: slightly worse 3%
- Average webserver backlog wait time: 4% worse (0.09ms)
- Average Deimos send duration (time to encode and enqueue): better by 12% (0.75ms)
- Average Rack time duration: 3% better (8ms)
Production webservers are overprovisioned, and the volume of messages is quite small. Therefore, the rdkafka benefits are barely visible. Surprisingly, rack time duration decreased, maybe because of less GVL lock time
O nice! So now the main thing would be to get rid of the partition key sync refresh to prevent clogging.