phobos icon indicating copy to clipboard operation
phobos copied to clipboard

Close producer no matter `if @producer_enabled` or not

Open renjiexu opened this issue 4 years ago • 11 comments

Hi phobos team,

By reading the current code, it would only close the producer when @producer_enable:

https://github.com/phobos/phobos/blob/c9534923d0eb7460ecf74fba34526545c4220e90/lib/phobos/listener.rb#L115-L118

This assumes we have a class that extends both Listener and Producer. In our code, we have something like:

class KafkaProducer
  include Phobos::Producer
end

..and it has been used in the consumer/listener, but didn't get closed when we reboot the consumer.

  1. What's the problem that if we don't close the producer connection during deployment?
  2. Could we go through OpenSpace find all the classes that inherit Phobos::Producer and close them?

Thanks!

renjiexu avatar Jun 10 '20 17:06 renjiexu

For producers, by default they are only opened/closed when you produce:

        def publish_list(messages)
          producer = sync_producer || create_sync_producer
          produce_messages(producer, messages)
          producer.deliver_messages
        ensure
          producer&.shutdown unless Phobos.config.producer_hash[:persistent_connections]
        end

If you have persistent_connections set to true, that's when the producer will not automatically be shutdown. In this case, it's up to you to shut it down manually when your process shuts down, e.g.:

at_exit do
  MyProducer.producer.sync_producer_shutdown
end

dorner avatar Jun 10 '20 17:06 dorner

Yeah, sorry I didn't explain the use case clearly in the description:

  1. We're using persistent_connection
  2. The producing happens inside the consumer

at_exit do MyProducer.producer.sync_producer_shutdown end

If I understand this correctly, this needs to happen inside the same thread that does the producing, where do you recommend to put this logic for phobos-consumer? Right now my approach is:

module Phobos
  class Listener
    alias original_stop_listener stop_listener

    def stop_listener
      original_stop_listener

      MyProducer.producer.sync_producer_shutdown
      MyProducer.producer.async_producer_shutdown
    end
  end
end

That's why I proposed to have this logic inside the existing stop_listener directly instead of monkey-patching it

renjiexu avatar Jun 10 '20 17:06 renjiexu

One option is for you to define your producer classes outside the listener, e.g. in an initializer (if you're running Rails), a railtie, etc. This means your consumers would share the producer objects and you could shut them down from a single place.

dorner avatar Jun 10 '20 17:06 dorner

Ah - just realized Phobos has a boot option that lets you define some code to run before the threading happens. So you could define it there.

dorner avatar Jun 10 '20 17:06 dorner

For other application rails/sinatra, we handled it by puma on_worker_shutdown hook and it works fine.

For publishing inside the phobos consumer, I'm not sure how to share the producer across different threads though (we're using max_concurrency: 3 here, but it's same issue for 1), for example, I have following in phobos_boot.rb:

puts "boot thread id: #{Thread.current.object_id}"

at_exit do
  puts "Inside at_exit"
  MyProducer.producer.sync_producer_shutdown
  puts "Done at_exit"
end

This is the output:

boot thread id: 70240352542780
{"msg":"Start Listener thread_id: 70240357425020"}
{"msg":"Start Listener thread_id: 70240357422220"}
{"msg":"Start Listener thread_id: 70240357418780"}

...
// stop
{"msg":"Stop Listener.... 70240357422220"}
sync_producer_shutdown: 70240357422220, true
{"msg":"Stop Listener.... 70240357425020"}
sync_producer_shutdown: 70240357425020, true
{"msg":"Stop Listener.... 70240357418780"}
sync_producer_shutdown: 70240357418780, false // this is checking if `producer_store[:sync_producer].nil?`
Inside at_exit
sync_producer_shutdown: 70240352542780, true
Done at_exit

So the boot thread isn't the same one as those store producers, any thoughts on how to address this? Thanks!

renjiexu avatar Jun 10 '20 18:06 renjiexu

You first need to actually create the producer before starting up the listeners. So it's not enough to put in the at_exit block, you actually have to initialize the producer there by calling create_sync_producer or create_async_producer.

dorner avatar Jun 10 '20 18:06 dorner

Hm? are you saying it's actually the same producer among all the threads including the main one for phobos?

renjiexu avatar Jun 10 '20 18:06 renjiexu

Hmm... now that I'm looking at it I think you're right. It would be stored in thread variables and sharing the object reference wouldn't share the producer. I'm not sure why I've never come across this, because we also do producing inside our consumers. 😮

I'm down for a PR to add a callback before a listener shuts down so you can add code to shut down your producers there.

dorner avatar Jun 10 '20 18:06 dorner

cool, yeah, callback sounds better than looping the OpenSpace.

because we also do producing inside our consumers.

btw, back the original question in 1), do you know what's the downside if we don't close the connection? I think broker may still close the idle connection after certain time?

renjiexu avatar Jun 10 '20 20:06 renjiexu

Yeah I don't know of any ramifications of not closing the producer. I think you're right, the broker should kill the extra one after some time. connections.max.idle.ms is the setting.

dorner avatar Jun 10 '20 20:06 dorner

For other application rails/sinatra, we handled it by puma on_worker_shutdown hook and it works fine.

btw, I reviewed this today and realize I made a mistake: on_worker_shutdown only has access to worker context, each worker could have multiple threads and seems impossible to shutdown the producer in this case

renjiexu avatar Jun 11 '20 18:06 renjiexu