phobos icon indicating copy to clipboard operation
phobos copied to clipboard

Thread leaks

Open 2rba opened this issue 11 months ago • 7 comments

Problem

phobos uses Thread.current[NAMESPACE] thread to save a reference to async_producer Therefore, it causes threads(memory) leaks in a multithreaded environments (Sidekiq, puma).

code to reproduce

p Thread.list.size => 5
Thread.new {Deimos::Backends::KafkaAsync.execute(producer_class: Struct.new(:topic).new, messages: [])}.join
p Thread.list.size => 7
Thread.new {Deimos::Backends::KafkaAsync.execute(producer_class: Struct.new(:topic).new, messages: [])}.join
p Thread.list.size => 9

Suggestion

As ruby-kafka is multithread safe it makes sense to have one instance of async producer per process. https://github.com/zendesk/ruby-kafka?tab=readme-ov-file#thread-safety

the asynchronous producer should be safe to use in a multi-threaded environment. This is because producers, when instantiated, get their own copy of any non-thread-safe data such as network sockets. Furthermore, the asynchronous producer has been designed in such a way to only a single background thread operates on this data while any foreground thread with a reference to the producer object can only send messages to that background thread over a safe queue. Therefore it is safe to share an async producer object between many threads.

This fixes the leak, reduces memory usage and potentially improves performance as messages might be sent in batches.

this can be achieved by updating create_async_producer and async_producer methods https://github.com/phobos/phobos/blob/master/lib/phobos/producer.rb#L141 as

        def create_async_producer
          client = kafka_client || configure_kafka_client(Phobos.create_kafka_client(:producer))
          @async_producer = client.async_producer(**async_configs)
        end

        # @return [Kafka::AsyncProducer]
        def async_producer
          @async_producer
        end

2rba avatar Jan 21 '25 21:01 2rba

PRs welcome!

dorner avatar Jan 22 '25 14:01 dorner

As ruby-kafka is multithread safe it makes sense to have one instance of async producer per process.

Just keep in mind that historically ruby-kafka producer had a lot of issues related to message buffer limits reached due to slow data dispatches. Reducing this to one producer may cause it producer way more messages with one instance which may cause crashes.

FYI exactly this was the reason why I rewrote 4 years ago waterdrop from ruby-kafka to librdkafka

mensfeld avatar Jan 22 '25 15:01 mensfeld

Just keep in mind that historically ruby-kafka producer had a lot of issues related to message buffer limits reached due to slow data dispatches

thank you for sharing! I will do performance tests with one async producer versus as it is now. Will share the results here

2rba avatar Jan 23 '25 16:01 2rba

Instead of trying to fix phobos async_producer I followed @mensfeld advice and tried waterdrop as deimos backend. I wrote a simple proxy class for waterdrop api, created async deimos backend and overwritten deimos determine_backend_class

I dont see significant performance benefits so far. There is one factor which might affect rdkafka performance. Deimos adds partition_key to a message. If the partition_key is set rdkafka tries to map it to a partition number therefore rdkafka needs to know the partition count from the kafka metadata https://github.com/karafka/rdkafka-ruby/blob/v0.18.0/lib/rdkafka/producer.rb#L293 rdkafka has a read-through cache for the kafka metadata with 30 seconds TTL without any cache miss storm prevention. Metadata fetching causes publish delays of up to 2 seconds (even in async publishing)

I am going to add async queue for waterdrop produce_many_async calls. Hopefully, that would make async publishing with rdkafka faster than with async ruby-kafka

2rba avatar Feb 05 '25 21:02 2rba

@2rba you mind providing benchmarks? in regards to the cache, I have a plan to improve it in the upcoming months by aligning it with the librdkafka metadata refresh layer and re-using the librdkafka data cache. This would pretty much eliminate the recheck hickup. On top of that there's already this https://github.com/karafka/rdkafka-ruby/issues/402 and another card (that I can't find now) for an async, non-blocking refetch of all of this. FYI if you do not rescale stuff often you could easily increase this cache.

mensfeld avatar Feb 05 '25 21:02 mensfeld

I benchmarked sync ruby-kafka vs sync rdkafka from local env via deimos.
It was something like Benchmark.measure { Deimos::Producer.publish_list(9000.times.map { {a: 'a'*1000} }) }

  • Deimos::Backends::Kafka ~43 seconds
  • Deimos::Backends::RdkafkaSync ~ 10 seconds

rdkafka 4x times faster

Then I set up a production experiment for async send Half of production web pods use ruby-kafka, and the other half uses rdkafka rdkafka messages are sent via concurrent-ruby async thread (to avoid metadata fetch delays) The volume of messages: max 50 per/second, average 16 per/second For rdkafka

  • Average CPU usage: slightly better 3%
  • Average memory usage: no difference was detected
  • Average ruby GC time: slightly worse 3%
  • Average webserver backlog wait time: 4% worse (0.09ms)
  • Average Deimos send duration (time to encode and enqueue): better by 12% (0.75ms)
  • Average Rack time duration: 3% better (8ms)

Production webservers are overprovisioned, and the volume of messages is quite small. Therefore, the rdkafka benefits are barely visible. Surprisingly, rack time duration decreased, maybe because of less GVL lock time

2rba avatar Feb 07 '25 10:02 2rba

O nice! So now the main thing would be to get rid of the partition key sync refresh to prevent clogging.

mensfeld avatar Feb 07 '25 10:02 mensfeld