logstash
logstash copied to clipboard
input workers exception handling
There are cases of logstash completely crashing by an exception raised from inside an input sub-thread (a thread created by the input plugin to add concurrency to its input duties).
This is troubling since the pipeline inputworker
plugin.run
is wrapped in a begin ... rescue => e
statement and thus any unrescued exception in an input plugin should always be rescued at the pipeline inputworker
.
I was able to reproduce this behaviour when an input exception is triggered from within one of its sub-treads. The interesting part is that it depends on the sub-threads join
order.
Here's the reproduction code which mimics logstash threads initialization sequence and exception handling.
Thread.abort_on_exception = true
Thread.report_on_exception = false
def plugin_worker2
begin
puts("plugin_worker2 start sleep")
sleep(60)
puts("plugin_worker2 end sleep")
ensure
puts("plugin_worker2 ensure")
end
end
def plugin_worker1
begin
sleep(1)
puts("plugin_worker1 raising exception")
raise("foo")
ensure
puts("plugin_worker1 ensure")
end
end
def plugin_run
worker_thread1 = Thread.new { plugin_worker1 }
worker_thread2 = Thread.new { plugin_worker2 }
worker_thread2.join
worker_thread1.join
end
def input_worker
begin
plugin_thread = Thread.new { plugin_run }
plugin_thread.join
rescue => e
puts("input_worker rescue #{e.inspect}")
ensure
puts("input_worker ensure")
end
end
def pipeline
input_thread = Thread.new { input_worker }
input_thread.join
end
begin
pipeline_thread = Thread.new { pipeline }
pipeline_thread.join
rescue => e
puts("main rescue #{e.inspect}")
ensure
puts("main ensure")
end
When running this it produces:
plugin_worker2 start sleep
plugin_worker1 raising exception
plugin_worker1 ensure
main rescue #<RuntimeError: foo>
main ensure
We see that the input worker exception handling was skipped and logstash crashed (main rescue #<RuntimeError: foo>
)
But simply changing the order of the workers join to
worker_thread1.join
worker_thread2.join
It behaves correctly:
plugin_worker2 start sleep
plugin_worker1 raising exception
plugin_worker1 ensure
input_worker rescue #<RuntimeError: foo>
input_worker ensure
main rescue #<RuntimeError: foo>
main ensure
We see that the input worker was able to rescue the exception input_worker rescue #<RuntimeError: foo>
and in logstash this would have actually restarted the input plugin.
So this logic will only work iif the first sub-thread joined is the one raising the exception which is not very practical. I will investigate how to improve this; will look into the ThreadsWait
class.
The kafka input is especially prone to the problem because it basically spans a number of consumer threads that does the work in the run
method. I created a specific kafka issue for that logstash-plugins/logstash-integration-kafka#15
relates https://github.com/elastic/logstash/issues/10612