logstash-filter-aggregate
logstash-filter-aggregate copied to clipboard
logstash-filter-aggregate plugin is not merging two CDR (logs) into one index
Logstash information:
Please include the following information:
- Logstash version logstash 7.17.6
- Logstash installation source docker
- How is Logstash being run docker
- How was the Logstash Plugin installed
JVM (e.g. java -version):
If the affected version of Logstash is 7.9 (or earlier), or if it is NOT using the bundled JDK or using the 'no-jdk' version in 7.10 (or higher), please provide the following information:
- JVM version (
java -version) - JVM installation source (e.g. from the Operating System's package manager, from source, etc).
- Value of the
JAVA_HOMEenvironment variable if set.
OS version (uname -a if on a Unix-like system): Linux logstash-os-west2a-pvt.xyz.test.biz 5.10.209-198.858.amzn2.aarch64 #1 SMP Tue Feb 13 18:46:45 UTC 2024 aarch64 aarch64 aarch64 GNU/Linux
Description of the problem including expected versus actual behavior:
I am having some logs in Opensearch name ecc-tdr & cdn-tdr , Both logs are there in one index name "ecc-cdn-combined4-prod-tdrs-2024.02"
Using logstash pipeline , I am taking input from "ecc-cdn-combined4-prod-tdrs-2024.02" index from Opensearch and using logstash-filter-aggregate plugin to merge both logs (Transaction call record ) in one merged index "ecc-cdn-merged" Its not getting merged into one Logs (TDR) with all required field with ecc-tdr & cdn-tdr
Steps to reproduce:
1.[rcd_sent] == "logo" is for ecc-tdrs 2.[url.path] =~ /.+bmp/ is for cdn-tdrs 3. Wherever the transactionid is matching i need those logs into merged index
Provide logs (if relevant): Filter code=====> filter { if ( [rcd_sent] == "logo" ) { aggregate { task_id => "%{transaction_id}" code => " map['ecc_customer'] = event.get('enterprise_billing_id') map['transaction_id'] = event.get('transaction_id') map['ecc_req'] = event.get('raw_req_p') map['ecc_resp'] = event.get('raw_resp_p') map['ecc_request_time'] = event.get('request_time') " map_action => "create" timeout => 600 # 10 minutes timeout timeout_tags => ['_cdn_tdr_missing'] push_map_as_event_on_timeout => "true" push_previous_map_as_event => "true" timeout_timestamp_field => "@timestamp" timeout_task_id_field => "task_id" } } if ( [url.path] =~ /.+bmp/ ) { mutate { add_field => { "cdn_customer" => "%{url.path}" } } aggregate { task_id => "%{transaction_id}" code => "event.set('ecc_customer', map['ecc_customer']) event.set('ecc_req', map['ecc_req'])" map_action => "create_or_update" timeout => 300 end_of_task => true } } }
filter { if ( [rcd_sent] == "logo" ) { aggregate { task_id => "%{transaction_id}" code => " map['ecc_customer'] = event.get('enterprise_billing_id') map['transaction_id'] = event.get('transaction_id') map['ecc_req'] = event.get('raw_req_p') map['ecc_resp'] = event.get('raw_resp_p') map['ecc_request_time'] = event.get('request_time') " map_action => "create" timeout => 600 # 10 minutes timeout timeout_tags => ['_cdn_tdr_missing'] push_map_as_event_on_timeout => "true" push_previous_map_as_event => "true" timeout_timestamp_field => "@timestamp" timeout_task_id_field => "task_id" } } if ( [url.path] =~ /.+bmp/ ) { mutate { add_field => { "cdn_customer" => "%{url.path}" } } aggregate { task_id => "%{transaction_id}" code => "event.set('ecc_customer', map['ecc_customer']) event.set('ecc_req', map['ecc_req'])" map_action => "create_or_update" timeout => 300 end_of_task => true } } }