fluent-plugin-kafka icon indicating copy to clipboard operation
fluent-plugin-kafka copied to clipboard

kafka message getting rejected at OpenSearch when JSON message contains field names "message" in fields

Open vishalmamidi opened this issue 2 years ago • 2 comments

Describe the bug

Kafka messages are getting rejected at OpenSearch when there is a field named message in the JSON Kafka message.

To Reproduce

not facing issue with below JSON

{
    "event": "created",
    "data": {
        "account_number": "222222223",
        "amount": 9900,
        "check_number": "11",
        "created_at": "2019-12-12T22:34:59Z",
        "currency": "USD",
        "deposit_date": "2019-12-12",
        "id": "9e5c22be-2145-4da4-963f-b0434765d18f",
        "lockbox_number": "12345",
        "memo_field": "Christmas Tip",
        "object": "paper_item",
        "remitter_name": "Cristina Angela",
        "routing_number": "021000021",
        "status": "pending",
        "transaction_id": null,
        "transaction_line_item_id": null,
        "updated_at": "2019-12-12T22:34:59Z"
    }
}

but when I change JSON field data with message below example JSON is getting rejected

{
    "event": "created",
    "message": {
        "account_number": "222222223",
        "amount": 9900,
        "check_number": "11",
        "created_at": "2019-12-12T22:34:59Z",
        "currency": "USD",
        "deposit_date": "2019-12-12",
        "id": "9e5c22be-2145-4da4-963f-b0434765d18f",
        "lockbox_number": "12345",
        "memo_field": "Christmas Tip",
        "object": "paper_item",
        "remitter_name": "Cristina Angela",
        "routing_number": "021000021",
        "status": "pending",
        "transaction_id": null,
        "transaction_line_item_id": null,
        "updated_at": "2019-12-12T22:34:59Z"
    }
}

Expected behavior

It is supposed to get passed like below for both JSON with message field and without message field

image

Your Environment

- Fluentd version: 1.14.3
- fluent-plugin-kafka version: 0.17.3
- ruby-kafka version: 
- Operating system: CentOS
- Kernel version:

Your Configuration

    <source>
      @type kafka_group
      brokers b-1.amazonaws.com:9092,b-2.amazonaws.com:9092
      consumer_group amazon.broker-2
      topics jb-audit-user
      format json
    </source>

      <match jb-audit-user>
        @type opensearch

        ssl_verify false
        @log_level debug

        logstash_format true
        logstash_prefix jb-audit-user # defaults to "logstash"
        logstash_prefix_separator -     # defaults to "-"
        logstash_dateformat %Y.%m       # defaults to "%Y.%m.%d"

        user "xx"
        password "xx"

        <endpoint>
        url "https://vpc.es.amazonaws.com:443"
          region "us-east-1"
        </endpoint>

        <buffer>
          @type file
          path /var/log/fluentd-buffers/kubernetes-jb-audit-user.system.buffer
          flush_mode interval
          flush_interval 1s
          flush_thread_count 4
          chunk_full_threshold 0.9
        </buffer>
      </match>

Your Error Log

2022-02-14 12:49:13 +0000 [warn]: #0 dump an error event: error_class=Fluent::Plugin::OpenSearchErrorHandler::OpenSearchError error="400 - Rejected by OpenSearch [error type]: mapper_parsing_exception [reason]: 'failed to parse field [message] of type [text] in document with id 'Xe9H-H4BIPPY1LUJqWRI'. Preview of field's value: '{website=https://SrinivasStudios.com, notes=, applicationActive=false, parentEntity=Srinivas Studios, createdDateTime=1643961318752, [email protected], clientType=Consumer, statusComments=Approved by JB 14-02-2022 06:19 PM IST, createdBy=null, contact=[], name=Srinivas Studios, clientSegment=Proprietary Trading Organizations, faxNumber=8979879879, modifiedBy=null, modifiedDateTime=1644842948940, id=143001, statusUpdateTime=1644842948000, status=Approved}''" location=nil tag="jb-audit-user" time=2022-02-14 12:49:10.332082686 +0000 record={"X-corelation-ID"=>nil, "requestId"=>"67726045-a99e-42ab-a485-faf8544ecf5a", "eventIndex"=>"jb-audit-user", "message"=>{"createdBy"=>nil, "createdDateTime"=>1643961318752, "modifiedBy"=>nil, "modifiedDateTime"=>1644842948940, "id"=>143001, "name"=>"Srinivas Studios", "emailAddress"=>"[email protected]", "clientType"=>"Consumer", "clientSegment"=>"Proprietary Trading Organizations", "faxNumber"=>"8979879879", "website"=>"https://SrinivasStudios.com", "parentEntity"=>"Srinivas Studios", "notes"=>"", "status"=>"Approved", "statusUpdateTime"=>1644842948000, "statusComments"=>"Approved by JB 14-02-2022 06:19 PM IST", "contact"=>[], "applicationActive"=>false}, "event"=>"POSTUPDATE", "eventTimestamp"=>1644842948941}


### Additional context

_No response_

vishalmamidi avatar Feb 14 '22 14:02 vishalmamidi

Hey @vishalmamidi This does not sound like an issue related to the Kafka plugin, but rather specific to your OpenSearch setup. Perhaps you could look into your index mapping and confirm whether you have set the message field as a text as that's what the error log is indicating.

raytung avatar May 10 '22 22:05 raytung

This issue has been automatically marked as stale because it has been open 90 days with no activity. Remove stale label or comment or this issue will be closed in 30 days

github-actions[bot] avatar Aug 09 '22 10:08 github-actions[bot]

This issue was automatically closed because of stale in 30 days

github-actions[bot] avatar Sep 09 '22 10:09 github-actions[bot]