logstash icon indicating copy to clipboard operation
logstash copied to clipboard

Restart crashed pipelines

Open jsvd opened this issue 5 years ago • 5 comments

With the introduction of https://github.com/elastic/logstash/pull/12307, we protect the Logstash process from crashing due to an error in a pipeline or pipeline worker.

The consequence of this is that a crashed pipeline will no longer bring Logstash down. The problem with this is that the pipeline won't be restarted, so Logstash will stay up without any running pipelines. Before, in a way, crashing would be beneficial as the operating system's init system would restart the process.

A suggestion to fix this is to always restart a crashed pipeline in logstash, especially if automatic reloading or central management is enabled.

jsvd avatar Feb 19 '21 11:02 jsvd

An example configuration:

input { tcp { port => 3333 } }
filter {
  if [x] > 1 { drop {} }
}
output { stdout {} }

Now we start logstash with automatic reloading, send a package that crashes the pipeline, and observe that the pipeline terminates but Logstash doesn't stop:

/tmp/logstash-7.11.1
❯ bin/logstash -f cfg -r
Sending Logstash logs to /tmp/logstash-7.11.1/logs which is now configured via log4j2.properties
[2021-02-19T11:53:44,904][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.11.1", "jruby.version"=>"jruby 9.2.13.0 (2.5.7) 2020-08-03 9a89c94bcc OpenJDK 64-Bit Server VM 11.0.9+11 on 11.0.9+11 +indy +jit [darwin-x86_64]"}
[2021-02-19T11:53:46,777][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1000, "pipeline.sources"=>["/private/tmp/logstash-7.11.1/cfg"], :thread=>"#<Thread:0x7941e6cd run>"}
[2021-02-19T11:53:47,614][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>0.82}
[2021-02-19T11:53:47,774][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2021-02-19T11:53:47,783][INFO ][logstash.inputs.tcp      ][main][7d4d14cbeb4d3b0cc5792a19bc1add2a148ffd455cc054182d3690c99a80151b] Starting tcp input listener {:address=>"0.0.0.0:3333", :ssl_enable=>"false"}
[2021-02-19T11:53:47,812][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2021-02-19T11:53:48,002][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2021-02-19T11:54:01,187][ERROR][logstash.codecs.line     ][main][7d4d14cbeb4d3b0cc5792a19bc1add2a148ffd455cc054182d3690c99a80151b] Wwwwwweeee {:data=>"1\n"}
[2021-02-19T11:54:01,325][ERROR][logstash.javapipeline    ][main] Pipeline worker error, the pipeline will be stopped {:pipeline_id=>"main", :error=>"", :exception=>Java::JavaLang::NullPointerException, :backtrace=>["org.logstash.config.ir.compiler.EventCondition$Compiler$UnexpectedTypeException.getUnexpectedTypeDetails(EventCondition.java:693)", "org.logstash.config.ir.compiler.EventCondition$Compiler$UnexpectedTypeException.<init>(EventCondition.java:681)", "org.logstash.config.ir.compiler.EventCondition$Compiler.compare(EventCondition.java:453)", "org.logstash.config.ir.compiler.EventCondition$Compiler.lambda$compareFieldToConstant$11(EventCondition.java:444)", "org.logstash.config.ir.compiler.Utils.filterEvents(Utils.java:47)", "org.logstash.generated.CompiledDataset1.compute(Unknown Source)", "org.logstash.generated.CompiledDataset2.compute(Unknown Source)", "org.logstash.generated.CompiledDataset3.compute(Unknown Source)", "org.logstash.config.ir.CompiledPipeline$CompiledUnorderedExecution.compute(CompiledPipeline.java:329)", "org.logstash.config.ir.CompiledPipeline$CompiledUnorderedExecution.compute(CompiledPipeline.java:323)", "org.logstash.execution.WorkerLoop.run(WorkerLoop.java:83)", "java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)", "java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)", "java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)", "java.base/java.lang.reflect.Method.invoke(Method.java:566)", "org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:426)", "org.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:293)", "org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:24)", "org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:86)", "org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:207)", "private.tmp.logstash_minus_7_dot_11_dot_1.logstash_minus_core.lib.logstash.java_pipeline.RUBY$block$start_workers$5(/private/tmp/logstash-7.11.1/logstash-core/lib/logstash/java_pipeline.rb:295)", "org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:138)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:52)", "org.jruby.runtime.Block.call(Block.java:139)", "org.jruby.RubyProc.call(RubyProc.java:318)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:105)", "java.base/java.lang.Thread.run(Thread.java:834)"], :thread=>"#<Thread:0x7941e6cd sleep>"}
[2021-02-19T11:54:05,582][INFO ][logstash.javapipeline    ][main] Pipeline terminated {"pipeline.id"=>"main"}

[ .. at this time the process is up but nothing happens .. ]

jsvd avatar Feb 19 '21 11:02 jsvd

I'm facing this in 7.10.2 currently. The annoying thing I find about this is that there is no sign in the API (/_node/pipelines, or /_node/stats) that the pipeline is shutdown. Not even in .pipelines.main.reloads

This makes it harder to write a health-check (without resorting to collecting metrics over time)

# curl -s 127.0.0.1:9600/_node/stats | jq .
{
    "host": "...",
    "version": "7.10.2",
    "http_address": "127.0.0.1:9600",
    "id": "...",
    "name": "...",
    "ephemeral_id": "...",
    "status": "green",
    "snapshot": false,
    "pipeline": {
      "workers": 4,
      "batch_size": 250,
      "batch_delay": 50
    },
    "jvm": {"..."},
    "process": {"..."},
    "events": {
      "in": 1552,
      "filtered": 0,
      "out": 0,
      "duration_in_millis": 0,
      "queue_push_duration_in_millis": 5
    },
    "pipelines": {
      "main": {
        "events": {
          "queue_push_duration_in_millis": 5,
          "out": 0,
          "filtered": 0,
          "duration_in_millis": 0,
          "in": 1552
        },
        "plugins": {"..."},
        "reloads": {
          "last_failure_timestamp": null,
          "failures": 0,
          "last_success_timestamp": null,
          "last_error": null,
          "successes": 0
        },
        "queue": {"..."},
        "hash": "...",
        "ephemeral_id": "..."
      }
    },
    "reloads": {
      "failures": 0,
      "successes": 0
    },
    "os": {"..."},
    "queue": {"..."}
  }

So in this case, I detect the presence of this condition by assuming that "javapipeline - Pipeline worker error, the pipeline will be stopped" is the last line in the logs.

cameronkerrnz avatar Feb 23 '21 03:02 cameronkerrnz

how should i restart a failed pipeline?

zdyj3170101136 avatar Jul 31 '25 11:07 zdyj3170101136

By restarting Logstash 😭

This problem was so blocking (and dangerous for our business) that I merged all pipelines into one mono-pipeline 🤢🤮

This way, when the only pipeline is blocked, Logstash restarts automatically, hence unlock the pipeline.

I hate this but did not find any better solution.

hilsonp avatar Jul 31 '25 17:07 hilsonp

By restarting Logstash 😭

This problem was so blocking (and dangerous for our business) that I merged all pipelines into one mono-pipeline 🤢🤮

This way, when the only pipeline is blocked, Logstash restarts automatically, hence unlock the pipeline.

I hate this but did not find any better solution.

The original problem, comparing string against number in the issue description was fixed (in Logstash 8.16 if I am not mistaken) and will not cause pipeline crash.

On top of that, Logstash included health report API, which can show RED status with a reasonable status (e.g PIPELINE_TERMINATED), you can leverage this API.

If you please provide you logs, pipeline configs versions (Logstash and plugins) you are using, we might have better insights about the situation. Thanks.

mashhurs avatar Oct 11 '25 00:10 mashhurs