kafka-connect-elasticsearch
kafka-connect-elasticsearch copied to clipboard
Data Missing With Elasticsearch Kafka Connector
I have been encountering a weird issue with Kafka and Confluent Sink Connector which I am using in my setup. I have a system where in I have two kafka connect sink working on same topic of Kafka. I have S3 Connect Sink and Elastic Sink both are configured to read data from the same topic and both have different consumer group assigned. As per my knowledge, both should have the same data read into. But what we are observing is the data read to Elasticsink is far too less than what it is persisted to S3 sink. Upon a simple check I could find that while S3 contains 100% of data which is being targeted to the topic, Elastic has only 10% of data.
Kafka 2.5.0
Confluent S3 version :- 5.5.1
Confluent Elastic Version :- 5.5.1
Config which I have for Elastic connector
topics: "topic1,topic2"
key.ignore: "true"
schema.ignore: "true"
timezone: "UTC"
connection.url: ""
offset.flush.timeout.ms: "180000"
session.timeout.ms: "600000"
connection.username: elastic
elastic.security.protocol: SSL
elastic.https.ssl.keystore.type: JKS
elastic.https.ssl.truststore.type: JKS
type.name: "_doc"
value.converter.schemas.enable: "false"
key.converter.schemas.enable: "false"
key.converter: "org.apache.kafka.connect.json.JsonConverter"
value.converter: "org.apache.kafka.connect.json.JsonConverter"
behavior.on.malformed.documents: "warn"
Is there any config which I am missing ?? Why is data missing for a single topic ? I don't see any logs which would indicate any issue with the connector. Any pointer which could help me out in this.
Below is a sample document sent through both topic.
{
"customer_id": "customer1",
"metadata-sdk_timestamp": "2022-05-13T16:09:02.875494+00:00",
"metadata-version": 1,
"description": "Server object changed",
"objects_affected": [
"Platform"
],
"type": "topic_1_object",
"object_id": 39,
"object_data-override_updated_at": "2022-05-13T16:09:01.812720+00:00",
"object_data-remaining_range": 321,
"object_data-fuel_level": null,
"object_data-alert_type": "ongoing_rental",
"object_data-alert_id": 1748,
"object_data-vin": "0011MYMOBILE01292021",
"object_data-fuel_percentage": 0,
"object_data-alert_level": "critical",
"object_data-fuel_type": "",
"object_data-object_id": 39,
"object_data-alert_timestamp": "2022-05-13T12:08:01.908124+00:00",
"object_data-liquid_fuel_percentage": 100,
"object_data-battery_level": 0,
"object_data-gps_timestamp": "2022-05-13T09:07:21.959000+00:00",
"object_data-event_name": "STATUS_ALERT",
"object_data-ev_battery_percentage": null,
"object_data-alert_end_timestamp": null,
"object_data-entity": "status_alert",
"object_data-cs_battery_level": 0,
"object_data-door_status": "LOCKED",
"subtype": null,
"name": "STATUS_ALERT",
"action": "fullsnapshot",
"timestamp": "2022-05-13T16:09:01.812720+00:00"
I have also encountered the same problem. When aggregating data through es, the 'doc_count' of the results of each aggregation is decreasing
Same here, not 10%, but around 70% is stored, while the rest of the data is missing.
Found the issue in our case, Since there is a huge amount of data, We had flush timeout, The ERROR can be seen in the connectors log /var/log/kafka/connect.log (in our case) Increasing the timeout solved the issue