phobos
phobos copied to clipboard
Close producer no matter `if @producer_enabled` or not
Hi phobos team,
By reading the current code, it would only close the producer when @producer_enable
:
https://github.com/phobos/phobos/blob/c9534923d0eb7460ecf74fba34526545c4220e90/lib/phobos/listener.rb#L115-L118
This assumes we have a class that extends both Listener and Producer. In our code, we have something like:
class KafkaProducer
include Phobos::Producer
end
..and it has been used in the consumer/listener, but didn't get closed when we reboot the consumer.
- What's the problem that if we don't close the producer connection during deployment?
- Could we go through
OpenSpace
find all the classes that inherit Phobos::Producer and close them?
Thanks!
For producers, by default they are only opened/closed when you produce:
def publish_list(messages)
producer = sync_producer || create_sync_producer
produce_messages(producer, messages)
producer.deliver_messages
ensure
producer&.shutdown unless Phobos.config.producer_hash[:persistent_connections]
end
If you have persistent_connections
set to true, that's when the producer will not automatically be shutdown. In this case, it's up to you to shut it down manually when your process shuts down, e.g.:
at_exit do
MyProducer.producer.sync_producer_shutdown
end
Yeah, sorry I didn't explain the use case clearly in the description:
- We're using persistent_connection
- The producing happens inside the consumer
at_exit do MyProducer.producer.sync_producer_shutdown end
If I understand this correctly, this needs to happen inside the same thread that does the producing, where do you recommend to put this logic for phobos-consumer? Right now my approach is:
module Phobos
class Listener
alias original_stop_listener stop_listener
def stop_listener
original_stop_listener
MyProducer.producer.sync_producer_shutdown
MyProducer.producer.async_producer_shutdown
end
end
end
That's why I proposed to have this logic inside the existing stop_listener
directly instead of monkey-patching it
One option is for you to define your producer classes outside the listener, e.g. in an initializer (if you're running Rails), a railtie, etc. This means your consumers would share the producer objects and you could shut them down from a single place.
Ah - just realized Phobos has a boot
option that lets you define some code to run before the threading happens. So you could define it there.
For other application rails/sinatra, we handled it by puma on_worker_shutdown
hook and it works fine.
For publishing inside the phobos consumer, I'm not sure how to share the producer across different threads though (we're using max_concurrency: 3
here, but it's same issue for 1), for example, I have following in phobos_boot.rb
:
puts "boot thread id: #{Thread.current.object_id}"
at_exit do
puts "Inside at_exit"
MyProducer.producer.sync_producer_shutdown
puts "Done at_exit"
end
This is the output:
boot thread id: 70240352542780
{"msg":"Start Listener thread_id: 70240357425020"}
{"msg":"Start Listener thread_id: 70240357422220"}
{"msg":"Start Listener thread_id: 70240357418780"}
...
// stop
{"msg":"Stop Listener.... 70240357422220"}
sync_producer_shutdown: 70240357422220, true
{"msg":"Stop Listener.... 70240357425020"}
sync_producer_shutdown: 70240357425020, true
{"msg":"Stop Listener.... 70240357418780"}
sync_producer_shutdown: 70240357418780, false // this is checking if `producer_store[:sync_producer].nil?`
Inside at_exit
sync_producer_shutdown: 70240352542780, true
Done at_exit
So the boot thread isn't the same one as those store producers, any thoughts on how to address this? Thanks!
You first need to actually create the producer before starting up the listeners. So it's not enough to put in the at_exit block, you actually have to initialize the producer there by calling create_sync_producer
or create_async_producer
.
Hm? are you saying it's actually the same producer
among all the threads including the main one for phobos?
Hmm... now that I'm looking at it I think you're right. It would be stored in thread variables and sharing the object reference wouldn't share the producer. I'm not sure why I've never come across this, because we also do producing inside our consumers. 😮
I'm down for a PR to add a callback before a listener shuts down so you can add code to shut down your producers there.
cool, yeah, callback sounds better than looping the OpenSpace.
because we also do producing inside our consumers.
btw, back the original question in 1), do you know what's the downside if we don't close the connection? I think broker may still close the idle connection after certain time?
Yeah I don't know of any ramifications of not closing the producer. I think you're right, the broker should kill the extra one after some time. connections.max.idle.ms
is the setting.
For other application rails/sinatra, we handled it by puma on_worker_shutdown hook and it works fine.
btw, I reviewed this today and realize I made a mistake: on_worker_shutdown only has access to worker context, each worker could have multiple threads and seems impossible to shutdown the producer in this case