fluent-plugin-record-modifier icon indicating copy to clipboard operation
fluent-plugin-record-modifier copied to clipboard

Question: replace whole record with single field from it

Open nhlushak opened this issue 4 years ago • 2 comments

I want to use this plugin to catch failed records from Elasticsearch output (e.g. "rejected by Elasticsearch") to put them into "dead-letter" output. Those failed records are emitted as fluent.warn records, with original message stored in "record" key of whole log message. What I want is to take this "record" key and move it down to fluentd pipeline as whole message itself with new tag. I did not find any documentation describing this neither for record_transformer plugin neither this one. Example of that is wanted: Original record:

2020-05-21 10:34:35.497925679 +0000 fluent.warn: 
{
    "error": "#<Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError: 400 - Rejected by Elasticsearch [error type]: mapper_parsing_exception [reason]: 'object mapping for [sample] tried to parse field [sample] as object, but found a concrete value'>",
    "location": null,
    "tag": "test.log",
    "time": 1589206011,
    "record": {
        "foo": "bar",
        "key": "value",
        "sample": [],
        "blah-blah": 133163771
    },
    "message": "dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error="400 - Rejected by Elasticsearch [error type]: mapper_parsing_exception [reason]: 'object mapping for [sample] tried to parse field [sample] as object, but found a concrete value'" location=nil tag="test.log" time=1589206011 record={\"foo\"=>\"bar\", \"key\"=>\"value\", \"sample\"=>[], \"blah-blah\"=>133163771}"
}

Modified record:

2020-05-21 10:34:36.497925679 +0000 dead.log: 
{
    "foo": "bar",
    "key": "value",
    "sample": [],
    "blah-blah": 133163771
}

nhlushak avatar May 22 '20 08:05 nhlushak

Also has similar question with you @NikitaGl https://stackoverflow.com/questions/64155725/what-is-the-behavior-of-fluentd-when-it-gets-parser-error

Have you found the answer?

agung-kargo avatar Oct 01 '20 13:10 agung-kargo

@agung-kargo Yes, sort of. I figured this out with format plugin in output section:

<match fluent.warn>
  @type rewrite_tag_filter
  @id rewrite_tag_dead
  <rule>
    key tag
    pattern /^(logmessage.*)$/
    tag log_out.dead
  </rule>
</match>

<match log_out.dead>
  @type file
  @id out_file_dead
  path /data/output/log_rejected/log-%Y%m%d
  <buffer>
    @type file
    path /data/buffer/file/log_dead/
    flush_mode interval
    retry_type exponential_backoff
    flush_thread_count 1
    flush_interval 15s
    retry_forever true
    retry_max_interval 3m
    chunk_limit_size 10M
    total_limit_size 5G
    overflow_action block
    flush_at_shutdown true   
  </buffer>
  <format>
    # format section parameters
    @type single_value
    add_newline true
    message_key record
  </format>   
</match>

record section contains original message line that caused fluent to throw an error. The only drawback of such method is that format type single_value is supported only for file output.

nhlushak avatar Oct 01 '20 15:10 nhlushak