logstash-filter-aggregate
logstash-filter-aggregate copied to clipboard
WIP: Fix crash in pipeline environment
Hi
I am trying to save the maps of aggregate in a logstash filter that uses 8 pipelines.
But the plugin crashes with the following exception:
[2021-06-18T10:32:25,956][ERROR][logstash.javapipeline ][smtp11] Pipeline error {:pipeline_id=>"smtp11", :exception=>#<TypeError: nil is not a string>, :backtrace=>["org/jruby/RubyMarshal.java:138:in load'", "/opt/logstash-7.10.2/vendor/bundle/jruby/2.5.0/gems/logstash-filter-aggregate-2.9.2/lib/logstash/filters/aggregate.rb:132:in
block in register'", "org/jruby/RubyIO.java:1158:in open'", "/opt/logstash-7.10.2/vendor/bundle/jruby/2.5.0/gems/logstash-filter-aggregate-2.9.2/lib/logstash/filters/aggregate.rb:132:in
block in register'", "org/jruby/ext/thread/Mutex.java:164:in synchronize'", "/opt/logstash-7.10.2/vendor/bundle/jruby/2.5.0/gems/logstash-filter-aggregate-2.9.2/lib/logstash/filters/aggregate.rb:97:in
register'", "org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java:75:in register'", "/opt/logstash-7.10.2/logstash-core/lib/logstash/java_pipeline.rb:228:in
block in register_plugins'", "org/jruby/RubyArray.java:1809:in each'", "/opt/logstash-7.10.2/logstash-core/lib/logstash/java_pipeline.rb:227:in register_plugins'", "/opt/logstash-7.10.2/logstash-core/lib/logstash/java_pipeline.rb:586:in maybe_setup_out_plugins'", "/opt/logstash-7.10.2/logstash-core/lib/logstash/java_pipeline.rb:240:in
start_workers'", "/opt/logstash-7.10.2/logstash-core/lib/logstash/java_pipeline.rb:185:in run'", "/opt/logstash-7.10.2/logstash-core/lib/logstash/java_pipeline.rb:137:in
block in start'"], "pipeline.sources"=>[...], :thread=>"#<Thread:0x153f3631 run>"}
(See Elastic Case: https://support.elastic.co/customers/s/case/5004M00000i7jHN)
Attaching the pipeline ID prevents this crash.
My prefered solution would be to add a flag defaulting to true only if more than one pipeline is used, but I have no clue how to do this.
Regards Patrick
Hi,
I can't accept this PR, for 2 main reasons:
- that's weird to store aggregate maps elsewhere than where user asked to
- this is a breaking change that breaks next Logstash startup where file is not looked up at the right place, and so stored maps are not loaded and loosed.
That said, given your error and your explanations. your problem seems that you have 2 different pipelines (at least) that have the same "aggregate_maps_path" setting. And so, at Logstash startup, the first pipeline loads the file and then deletes it, the second pipeline tries to load the file but fails because it has been deleted by the first one in the meantime.
The good solution is to have a different "aggregate_maps_path" setting in each pipeline.
Hi
I can't accept this PR, for 2 main reasons:
I never expected this crap to merged.
In Elastic Support Case #00738461 I was asked to create it.
- that's weird to store aggregate maps elsewhere than where user asked to
I completely agree.
- this is a breaking change that breaks next Logstash startup where file is not looked up at the right place, and so stored maps are not loaded and loosed.
Again agreed, but is it working in other pipelined environment than mine?
That said, given your error and your explanations. your problem seems that you have 2 different pipelines (at least) that have the same "aggregate_maps_path" setting. And so, at Logstash startup, the first pipeline loads the file and then deletes it, the second pipeline tries to load the file but fails because it has been deleted by the first one in the meantime.
Correct. I have 4 active pipelines correlating the syslog output of a 4 node mail cluster.
The good solution is to have a different "aggregate_maps_path" setting in each pipeline.
That's where I and the support case started.
Personally I would prefer to do it on Logstash level, unfortunately I found no way to do it.
What I tried is:
`ruby { code => '
event.set("[aggregateCache]", "/var/tmp/logstash_aggregate_" + execution_context.pipeline.pipeline_id)
' } aggregate {
task_id => "%{connection_id}"
code => "map['pipeline_id'] = execution_context.pipeline.pipeline_id"
push_map_as_event_on_timeout => false
timeout_task_id_field => "connection_id"
timeout => 6000 # 10 minutes timeout
#aggregate_maps_path => "/var/tmp/aggregate" #version 1: only one file created -> seems not to work
#aggregate_maps_path => "%{aggregateCache}" #version 2: file does not get created
aggregate_maps_path => [aggregateCache] #version 3: file does not get created }`
I would like to get around the bug I am hitting...
Any help is welcome!
Regards Patrick