logstash icon indicating copy to clipboard operation
logstash copied to clipboard

input workers exception handling

Open colinsurprenant opened this issue 5 years ago • 2 comments

There are cases of logstash completely crashing by an exception raised from inside an input sub-thread (a thread created by the input plugin to add concurrency to its input duties).

This is troubling since the pipeline inputworker plugin.run is wrapped in a begin ... rescue => e statement and thus any unrescued exception in an input plugin should always be rescued at the pipeline inputworker.

I was able to reproduce this behaviour when an input exception is triggered from within one of its sub-treads. The interesting part is that it depends on the sub-threads join order.

Here's the reproduction code which mimics logstash threads initialization sequence and exception handling.

Thread.abort_on_exception = true
Thread.report_on_exception = false

def plugin_worker2
  begin
    puts("plugin_worker2 start sleep")
    sleep(60)
    puts("plugin_worker2 end sleep")
  ensure
    puts("plugin_worker2 ensure")
  end
end

def plugin_worker1
  begin
    sleep(1)
    puts("plugin_worker1 raising exception")
    raise("foo")
  ensure
    puts("plugin_worker1 ensure")
  end
end

def plugin_run
  worker_thread1 = Thread.new { plugin_worker1 }
  worker_thread2 = Thread.new { plugin_worker2 }

  worker_thread2.join
  worker_thread1.join
end

def input_worker
  begin
    plugin_thread = Thread.new { plugin_run }
    plugin_thread.join
  rescue => e
    puts("input_worker rescue #{e.inspect}")
  ensure
    puts("input_worker ensure")
  end
end

def pipeline
  input_thread = Thread.new { input_worker }
  input_thread.join
end


begin
  pipeline_thread = Thread.new { pipeline }
  pipeline_thread.join
rescue => e
  puts("main rescue #{e.inspect}")
ensure
  puts("main ensure")
end

When running this it produces:

plugin_worker2 start sleep
plugin_worker1 raising exception
plugin_worker1 ensure
main rescue #<RuntimeError: foo>
main ensure

We see that the input worker exception handling was skipped and logstash crashed (main rescue #<RuntimeError: foo>)

But simply changing the order of the workers join to

  worker_thread1.join
  worker_thread2.join

It behaves correctly:

plugin_worker2 start sleep
plugin_worker1 raising exception
plugin_worker1 ensure
input_worker rescue #<RuntimeError: foo>
input_worker ensure
main rescue #<RuntimeError: foo>
main ensure

We see that the input worker was able to rescue the exception input_worker rescue #<RuntimeError: foo> and in logstash this would have actually restarted the input plugin.

So this logic will only work iif the first sub-thread joined is the one raising the exception which is not very practical. I will investigate how to improve this; will look into the ThreadsWait class.

colinsurprenant avatar Feb 13 '20 01:02 colinsurprenant

The kafka input is especially prone to the problem because it basically spans a number of consumer threads that does the work in the run method. I created a specific kafka issue for that logstash-plugins/logstash-integration-kafka#15

colinsurprenant avatar Feb 21 '20 18:02 colinsurprenant

relates https://github.com/elastic/logstash/issues/10612

kaisecheng avatar Aug 08 '22 14:08 kaisecheng