Restart crashed pipelines
With the introduction of https://github.com/elastic/logstash/pull/12307, we protect the Logstash process from crashing due to an error in a pipeline or pipeline worker.
The consequence of this is that a crashed pipeline will no longer bring Logstash down. The problem with this is that the pipeline won't be restarted, so Logstash will stay up without any running pipelines. Before, in a way, crashing would be beneficial as the operating system's init system would restart the process.
A suggestion to fix this is to always restart a crashed pipeline in logstash, especially if automatic reloading or central management is enabled.
An example configuration:
input { tcp { port => 3333 } }
filter {
if [x] > 1 { drop {} }
}
output { stdout {} }
Now we start logstash with automatic reloading, send a package that crashes the pipeline, and observe that the pipeline terminates but Logstash doesn't stop:
/tmp/logstash-7.11.1
❯ bin/logstash -f cfg -r
Sending Logstash logs to /tmp/logstash-7.11.1/logs which is now configured via log4j2.properties
[2021-02-19T11:53:44,904][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.11.1", "jruby.version"=>"jruby 9.2.13.0 (2.5.7) 2020-08-03 9a89c94bcc OpenJDK 64-Bit Server VM 11.0.9+11 on 11.0.9+11 +indy +jit [darwin-x86_64]"}
[2021-02-19T11:53:46,777][INFO ][logstash.javapipeline ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1000, "pipeline.sources"=>["/private/tmp/logstash-7.11.1/cfg"], :thread=>"#<Thread:0x7941e6cd run>"}
[2021-02-19T11:53:47,614][INFO ][logstash.javapipeline ][main] Pipeline Java execution initialization time {"seconds"=>0.82}
[2021-02-19T11:53:47,774][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"}
[2021-02-19T11:53:47,783][INFO ][logstash.inputs.tcp ][main][7d4d14cbeb4d3b0cc5792a19bc1add2a148ffd455cc054182d3690c99a80151b] Starting tcp input listener {:address=>"0.0.0.0:3333", :ssl_enable=>"false"}
[2021-02-19T11:53:47,812][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2021-02-19T11:53:48,002][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2021-02-19T11:54:01,187][ERROR][logstash.codecs.line ][main][7d4d14cbeb4d3b0cc5792a19bc1add2a148ffd455cc054182d3690c99a80151b] Wwwwwweeee {:data=>"1\n"}
[2021-02-19T11:54:01,325][ERROR][logstash.javapipeline ][main] Pipeline worker error, the pipeline will be stopped {:pipeline_id=>"main", :error=>"", :exception=>Java::JavaLang::NullPointerException, :backtrace=>["org.logstash.config.ir.compiler.EventCondition$Compiler$UnexpectedTypeException.getUnexpectedTypeDetails(EventCondition.java:693)", "org.logstash.config.ir.compiler.EventCondition$Compiler$UnexpectedTypeException.<init>(EventCondition.java:681)", "org.logstash.config.ir.compiler.EventCondition$Compiler.compare(EventCondition.java:453)", "org.logstash.config.ir.compiler.EventCondition$Compiler.lambda$compareFieldToConstant$11(EventCondition.java:444)", "org.logstash.config.ir.compiler.Utils.filterEvents(Utils.java:47)", "org.logstash.generated.CompiledDataset1.compute(Unknown Source)", "org.logstash.generated.CompiledDataset2.compute(Unknown Source)", "org.logstash.generated.CompiledDataset3.compute(Unknown Source)", "org.logstash.config.ir.CompiledPipeline$CompiledUnorderedExecution.compute(CompiledPipeline.java:329)", "org.logstash.config.ir.CompiledPipeline$CompiledUnorderedExecution.compute(CompiledPipeline.java:323)", "org.logstash.execution.WorkerLoop.run(WorkerLoop.java:83)", "java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)", "java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)", "java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)", "java.base/java.lang.reflect.Method.invoke(Method.java:566)", "org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:426)", "org.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:293)", "org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:24)", "org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:86)", "org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:207)", "private.tmp.logstash_minus_7_dot_11_dot_1.logstash_minus_core.lib.logstash.java_pipeline.RUBY$block$start_workers$5(/private/tmp/logstash-7.11.1/logstash-core/lib/logstash/java_pipeline.rb:295)", "org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:138)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:52)", "org.jruby.runtime.Block.call(Block.java:139)", "org.jruby.RubyProc.call(RubyProc.java:318)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:105)", "java.base/java.lang.Thread.run(Thread.java:834)"], :thread=>"#<Thread:0x7941e6cd sleep>"}
[2021-02-19T11:54:05,582][INFO ][logstash.javapipeline ][main] Pipeline terminated {"pipeline.id"=>"main"}
[ .. at this time the process is up but nothing happens .. ]
I'm facing this in 7.10.2 currently. The annoying thing I find about this is that there is no sign in the API (/_node/pipelines, or /_node/stats) that the pipeline is shutdown. Not even in .pipelines.main.reloads
This makes it harder to write a health-check (without resorting to collecting metrics over time)
# curl -s 127.0.0.1:9600/_node/stats | jq .
{
"host": "...",
"version": "7.10.2",
"http_address": "127.0.0.1:9600",
"id": "...",
"name": "...",
"ephemeral_id": "...",
"status": "green",
"snapshot": false,
"pipeline": {
"workers": 4,
"batch_size": 250,
"batch_delay": 50
},
"jvm": {"..."},
"process": {"..."},
"events": {
"in": 1552,
"filtered": 0,
"out": 0,
"duration_in_millis": 0,
"queue_push_duration_in_millis": 5
},
"pipelines": {
"main": {
"events": {
"queue_push_duration_in_millis": 5,
"out": 0,
"filtered": 0,
"duration_in_millis": 0,
"in": 1552
},
"plugins": {"..."},
"reloads": {
"last_failure_timestamp": null,
"failures": 0,
"last_success_timestamp": null,
"last_error": null,
"successes": 0
},
"queue": {"..."},
"hash": "...",
"ephemeral_id": "..."
}
},
"reloads": {
"failures": 0,
"successes": 0
},
"os": {"..."},
"queue": {"..."}
}
So in this case, I detect the presence of this condition by assuming that "javapipeline - Pipeline worker error, the pipeline will be stopped" is the last line in the logs.
how should i restart a failed pipeline?
By restarting Logstash 😭
This problem was so blocking (and dangerous for our business) that I merged all pipelines into one mono-pipeline 🤢🤮
This way, when the only pipeline is blocked, Logstash restarts automatically, hence unlock the pipeline.
I hate this but did not find any better solution.
By restarting Logstash 😭
This problem was so blocking (and dangerous for our business) that I merged all pipelines into one mono-pipeline 🤢🤮
This way, when the only pipeline is blocked, Logstash restarts automatically, hence unlock the pipeline.
I hate this but did not find any better solution.
The original problem, comparing string against number in the issue description was fixed (in Logstash 8.16 if I am not mistaken) and will not cause pipeline crash.
On top of that, Logstash included health report API, which can show RED status with a reasonable status (e.g PIPELINE_TERMINATED), you can leverage this API.
If you please provide you logs, pipeline configs versions (Logstash and plugins) you are using, we might have better insights about the situation. Thanks.