krakow
krakow copied to clipboard
Producer Not Terminating
I'm having an issue when I try to write payloads in increments. The simplest setup I could come up with was this:
def run
producer = Krakow::Producer.new(
host: 'HOST',
port: '4150',
topic: 'process'
)
user_ids = []
100_000.times { |i| user_ids << SecureRandom.hex(12) }
user_ids.each_slice(50_000).with_index do |usrs, i|
puts "Publishing #{i*50_000} - #{(i+1) * 50_000} users to NSQ"
producer.write(*usrs)
end
producer.goodbye_my_love!
end
run
When I do this, the output looks something like:
I, [2015-03-19T12:18:14.505000 #46528] INFO -- : <Krakow::Producer:2122 {HOST:PORT} T:preprocess>: Establishing connection to: HOST:PORT
D, [2015-03-19T12:18:16.269000 #46528] DEBUG -- : <Krakow::Connection:2150 {HOST:PORT}>: Initializing connection
D, [2015-03-19T12:18:16.533000 #46528] DEBUG -- : <Krakow::Connection:2150 {HOST:PORT}>: Read wait for frame start
D, [2015-03-19T12:18:16.562000 #46528] DEBUG -- : <Krakow::Connection:2150 {HOST:PORT}>: <<< "\x00\x00\x01\f\x00\x00\x00\x00"
D, [2015-03-19T12:18:16.568000 #46528] DEBUG -- : <Krakow::Connection:2150 {HOST:PORT}>: Decoded structure: {:size=>264, :type=>0}
D, [2015-03-19T12:18:16.569000 #46528] DEBUG -- : <Krakow::Connection:2150 {HOST:PORT}>: <<< "{\"max_rdy_count\":2500,\"version\":\"0.3.2\",\"max_msg_timeout\":900000,\"msg_timeout\":60000,\"tls_v1\":false,\"deflate\":false,\"deflate_level\":0,\"max_deflate_level\":6,\"snappy\":false,\"sample_rate\":0,\"auth_required\":false,\"output_buffer_size\":16384,\"output_buffer_timeout\":250}"
D, [2015-03-19T12:18:16.571000 #46528] DEBUG -- : <Krakow::Connection:2150 {HOST:PORT}>: Struct: {:size=>264, :type=>0, :data=>"{\"max_rdy_count\":2500,\"version\":\"0.3.2\",\"max_msg_timeout\":900000,\"msg_timeout\":60000,\"tls_v1\":false,\"deflate\":false,\"deflate_level\":0,\"max_deflate_level\":6,\"snappy\":false,\"sample_rate\":0,\"auth_required\":false,\"output_buffer_size\":16384,\"output_buffer_timeout\":250}"} Frame: <Krakow::FrameType::Response:2152 [{:response=>"{\"max_rdy_count\":2500,\"version\":\"0.3.2\",\"max_msg_timeout\":900000,\"msg_timeout\":60000,\"tls_v1\":false,\"deflate\":false,\"deflate_level\":0,\"max_deflate_level\":6,\"snappy\":false,\"sample_rate\":0,\"auth_required\":false,\"output_buffer_size\":16384,\"output_buffer_timeout\":250}"}]>
I, [2015-03-19T12:18:16.584000 #46528] INFO -- : <Krakow::Connection:2150 {HOST:PORT}>: Connection settings: {:max_rdy_count=>2500, :version=>"0.3.2", :max_msg_timeout=>900000, :msg_timeout=>60000, :tls_v1=>false, :deflate=>false, :deflate_level=>0, :max_deflate_level=>6, :snappy=>false, :sample_rate=>0, :auth_required=>false, :output_buffer_size=>16384, :output_buffer_timeout=>250}
I, [2015-03-19T12:18:16.585000 #46528] INFO -- : <Krakow::Connection:2150 {HOST:PORT}>: Connection initialized
D, [2015-03-19T12:18:16.590000 #46528] DEBUG -- : <Krakow::Connection:2150 {HOST:PORT}>: Read wait for frame start
I, [2015-03-19T12:18:16.601000 #46528] INFO -- : <Krakow::Producer:2122 {HOST:PORT} T:preprocess>: Connection established: <Krakow::Connection:2150 {HOST:PORT}>
Publishing 0 - 50000 users to NSQ
D, [2015-03-19T12:18:16.740000 #46528] DEBUG -- : <Krakow::Producer:2122 {HOST:PORT} T:preprocess>: Multiple message publish
It will just hang and hang until I manually kill it from the terminal. It never reaches the loop the second time around (only 50k messages get pushed) and it never hits the goodbye_my_love!
terminator. Any idea what I might be doing wrong?
I'm on JRuby 1.7.19, using 0.3.12 of the gem.
Hi! That's the finalizer method and generally shouldn't be called directly. You want to terminate the actor directly:
producer.terminate
Also, by default the producer will wait for a confirmation of delivery on write. If you are pushing that many messages at once, it may be that you are looking for more of a "fire and forget". You can accomplish that by disabling the wait on response:
Krakow::Producer.new(
:host => HOST,
...,
:connection_options => {
:options => {
:response_wait => 0
}
}
)
You can also try looping through and doing single writes through the producer, or reducing the number of messages going through the mpub at one time (just smaller reducing the number you slice on iteration).
Thanks @chrisroberts for the quick reply. Putting response_wait: 0
did work but is a little unsafe for us in a production environment. I was able to get it to work with slices of roughly 2500, which will have to do for now.
I'll dig through and see where the bottleneck is (maybe our nsqd instance?). For reference, without setting response_wait
to 0
I'm able to push up over 150k messages in one write and watch it write to disk in nsq-admin but then the producer hangs indefinitely. Maybe it missed the confirmation or perhaps it never got sent...
I added a test for large payload pushing via mpub based on your failing example. It may be an issue that you'll need to increase the response wait to allow for the nsqd to fully accept and respond confirmation of the receipt. If no response is provided (and the response_wait option is > 0) it will kick up an exception (which maybe is getting caught and suppressed some where in your use case?).
Tests are all happy on various mri and jruby versions, so if you are still seeing issues after tweaking the options, please let me know and we can see if I can locally reproduce the behavior you're encountering.
Cheers!