fluent-plugin-kafka
fluent-plugin-kafka copied to clipboard
kafka message getting rejected at OpenSearch when JSON message contains field names "message" in fields
Describe the bug
Kafka messages are getting rejected at OpenSearch when there is a field named message in the JSON Kafka message.
To Reproduce
not facing issue with below JSON
{
"event": "created",
"data": {
"account_number": "222222223",
"amount": 9900,
"check_number": "11",
"created_at": "2019-12-12T22:34:59Z",
"currency": "USD",
"deposit_date": "2019-12-12",
"id": "9e5c22be-2145-4da4-963f-b0434765d18f",
"lockbox_number": "12345",
"memo_field": "Christmas Tip",
"object": "paper_item",
"remitter_name": "Cristina Angela",
"routing_number": "021000021",
"status": "pending",
"transaction_id": null,
"transaction_line_item_id": null,
"updated_at": "2019-12-12T22:34:59Z"
}
}
but when I change JSON field data
with message
below example JSON is getting rejected
{
"event": "created",
"message": {
"account_number": "222222223",
"amount": 9900,
"check_number": "11",
"created_at": "2019-12-12T22:34:59Z",
"currency": "USD",
"deposit_date": "2019-12-12",
"id": "9e5c22be-2145-4da4-963f-b0434765d18f",
"lockbox_number": "12345",
"memo_field": "Christmas Tip",
"object": "paper_item",
"remitter_name": "Cristina Angela",
"routing_number": "021000021",
"status": "pending",
"transaction_id": null,
"transaction_line_item_id": null,
"updated_at": "2019-12-12T22:34:59Z"
}
}
Expected behavior
It is supposed to get passed like below for both JSON with message
field and without message
field
Your Environment
- Fluentd version: 1.14.3
- fluent-plugin-kafka version: 0.17.3
- ruby-kafka version:
- Operating system: CentOS
- Kernel version:
Your Configuration
<source>
@type kafka_group
brokers b-1.amazonaws.com:9092,b-2.amazonaws.com:9092
consumer_group amazon.broker-2
topics jb-audit-user
format json
</source>
<match jb-audit-user>
@type opensearch
ssl_verify false
@log_level debug
logstash_format true
logstash_prefix jb-audit-user # defaults to "logstash"
logstash_prefix_separator - # defaults to "-"
logstash_dateformat %Y.%m # defaults to "%Y.%m.%d"
user "xx"
password "xx"
<endpoint>
url "https://vpc.es.amazonaws.com:443"
region "us-east-1"
</endpoint>
<buffer>
@type file
path /var/log/fluentd-buffers/kubernetes-jb-audit-user.system.buffer
flush_mode interval
flush_interval 1s
flush_thread_count 4
chunk_full_threshold 0.9
</buffer>
</match>
Your Error Log
2022-02-14 12:49:13 +0000 [warn]: #0 dump an error event: error_class=Fluent::Plugin::OpenSearchErrorHandler::OpenSearchError error="400 - Rejected by OpenSearch [error type]: mapper_parsing_exception [reason]: 'failed to parse field [message] of type [text] in document with id 'Xe9H-H4BIPPY1LUJqWRI'. Preview of field's value: '{website=https://SrinivasStudios.com, notes=, applicationActive=false, parentEntity=Srinivas Studios, createdDateTime=1643961318752, [email protected], clientType=Consumer, statusComments=Approved by JB 14-02-2022 06:19 PM IST, createdBy=null, contact=[], name=Srinivas Studios, clientSegment=Proprietary Trading Organizations, faxNumber=8979879879, modifiedBy=null, modifiedDateTime=1644842948940, id=143001, statusUpdateTime=1644842948000, status=Approved}''" location=nil tag="jb-audit-user" time=2022-02-14 12:49:10.332082686 +0000 record={"X-corelation-ID"=>nil, "requestId"=>"67726045-a99e-42ab-a485-faf8544ecf5a", "eventIndex"=>"jb-audit-user", "message"=>{"createdBy"=>nil, "createdDateTime"=>1643961318752, "modifiedBy"=>nil, "modifiedDateTime"=>1644842948940, "id"=>143001, "name"=>"Srinivas Studios", "emailAddress"=>"[email protected]", "clientType"=>"Consumer", "clientSegment"=>"Proprietary Trading Organizations", "faxNumber"=>"8979879879", "website"=>"https://SrinivasStudios.com", "parentEntity"=>"Srinivas Studios", "notes"=>"", "status"=>"Approved", "statusUpdateTime"=>1644842948000, "statusComments"=>"Approved by JB 14-02-2022 06:19 PM IST", "contact"=>[], "applicationActive"=>false}, "event"=>"POSTUPDATE", "eventTimestamp"=>1644842948941}
### Additional context
_No response_
Hey @vishalmamidi This does not sound like an issue related to the Kafka plugin, but rather specific to your OpenSearch setup. Perhaps you could look into your index mapping and confirm whether you have set the message
field as a text
as that's what the error log is indicating.
This issue has been automatically marked as stale because it has been open 90 days with no activity. Remove stale label or comment or this issue will be closed in 30 days
This issue was automatically closed because of stale in 30 days