kafka-connect-elasticsearch icon indicating copy to clipboard operation
kafka-connect-elasticsearch copied to clipboard

Kafka Connect for Elastic search fails to publish few messages out of 10million messages to Elastic

Open vikasillumina opened this issue 3 years ago • 5 comments

Hi there,

We have a Kafka connect set up for sinking the events from Kafka to Elastic. We are experiencing data consistency issue and was hoping to get some guidance here in case we have issue on our end.

We are using the open source kafka connect confluentinc/cp-kafka-connect-base:6.1.0. Its working really great for us for our regular use but with load test we found issues. We applied load that resulted in 10million unique events on the Kafka topic. When we checked the elastic search and we found 96 documents were missing, they never made it to elastic.

We checked the kafka connect logs and the kubernetes pod where we are running the kafka connect and found below:

  1. The CPU on the pod was very high and in turn it caused the pods to get killed multiple times throughout the lifecycle of the load test. Not sure if this could be an issue for this missing document. We have addressed this issue now by adding more CPU and auto scaling.

  2. In the logs we saw Gateway timeout error (504) returned from the elastic service itself too and which it seems in turn resulted in Bulk request to fail with error like this: ERROR WorkerSinkTask{id=kafka-connect-gds-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Bulk request failed. (org.apache.kafka.connect.runtime.WorkerSinkTask)

  3. There were no other errors, there were no messages in the DLQ topic either.

  4. We manually checked and compared the events that made it to elastic with the one that didn't and didn't find any difference, so we believe this Gateway timeout error must have caused the issue with 96 files to not show up in elastic. But we could be wrong.

Also this issue should have been resolved after the gateway error went away and pods were restarted. The counts are still off. So it seems like the SinkTask failed to write to elastic but the committed the offset for consumer group? That is why after the restart the Kafka connect is not picking up those events that never made it to Elastic. I would expect these 2 things to get done in a transaction.

So I was hoping to get some insights here on what we could be doing wrong, or is it a gap in current Kafka-connect implementation where its expected that we will drop messages we elastic search is not available.

Here are our Kafka connect settings:

{ "name": "${KC_ES_CONNECTOR_NAME}", "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "connection.url": "${KC_ES_DOMAIN_ENDPOINT}", "type.name": "kafkaconnect", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "schema.ignore": "true", "topics": "${KC_ES_TOPICS}", "behavior.on.null.values": "DELETE", "behavior.on.malformed.documents": "IGNORE", "batch.size": "500", "read.timeout.ms": "60000", "linger.ms": "100", "write.method": "upsert", /// During the test this was set to upsert, but for our use case now we have change it to insert "errors.tolerance": "all", "errors.deadletterqueue.topic.name": "${KC_ES_NAMESPACE}.kc.myproduct.dlq", "errors.deadletterqueue.context.headers.enable": "true", "errors.log.enable": "true", "errors.log.include.messages": "true", "errors.retry.timeout": "-1", "errors.retry.delay.max.ms": "60000", "predicates": "isNullRecord", "predicates.isNullRecord.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone", "transforms": "dropNullRecords,transformserecords", "transforms.dropNullRecords.type": "org.apache.kafka.connect.transforms.Filter", "transforms.dropNullRecords.predicate": "isNullRecord", "transforms.transformserecords.type": "com.mycompany.kafka.connect.smt.TransformSERecord$Value", "transforms.transformserecords.transform": "true" }

We do have a custom SMT too, which does very basic transformations like adding few new fields and ignore certain events and route record to the right elastic index by conventions. All the 10M files were identical so the transformation worked the same way for all of them and we can confirm that with the logging we have in for SMT.

When this issue occurred, the Kafka connect was working with 1 worker with 1 pod (process). Definitely under provisioned for the type of test but we have addressed this since. We are planning to repeat the test but was hoping for guidance on anything obvious we are missing here.

Regards, Vikas

vikasillumina avatar Jul 22 '21 19:07 vikasillumina

Hi there, Any thoughts/suggestion on this? We really love what Kafka Connect does and it would be shame to give up on this. @levzem , @avocader sorry directly reaching out to you, but was hoping to get any inputs. What we are seeing, we can't be the only one.

Thanks for your help in advance.

Regards, Vikas

vikasillumina avatar Jul 24 '21 23:07 vikasillumina

Hi! I have a similar problem, if all instances of elasticsearch are not available some kafka messages are lost. kafka version - 2.7.0 kafka-connect-elasticsearch - 11.0.6

Kafka connect log: [2021-08-04 12:02:30,727] WARN Bulk request 2 failed. Retrying request. (io.confluent.connect.elasticsearch.ElasticsearchClient:335) java.net.ConnectException: Connection refused at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779) at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvent(DefaultConnectingIOReactor.java:171) at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:145) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351) at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221) at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64) at java.base/java.lang.Thread.run(Thread.java:834) [2021-08-04 12:02:36,446] INFO WorkerSinkTask{id=kafka-sink-xxxxx-0} Committing offsets asynchronously using sequence number 44: {xxxxx-0=OffsetAndMetadata{offset=9, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:352)

ES Sink Connector config: { "tasks.max":"1", "connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "connection.url":"http://elasticsearch:9200", "topics":"xxxxx", "behavior.on.null.values":"DELETE", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "transforms": "renameTopicToIndex", "transforms.renameTopicToIndex.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.renameTopicToIndex.regex": ".*", "transforms.renameTopicToIndex.replacement": "xxxxx", "connection.timeout.ms" : "5000", "read.timeout.ms" : "30000" }

WORKAROUND

The main problem is that in this case org.elasticsearch.action.bulk.BulkProcessor processes messages in an async way and doesn't propagate error to WorkerSinkTask which updates topic's offset. According to this link to make ES BulkProcessor work sync we should setup ConcurrentRequests as zero. We can do it by setting "max.in.flight.requests" : "1" in our connector config.

https://github.com/confluentinc/kafka-connect-elasticsearch/blob/ce275810b706989413ae35843f7d74bc5807c17c/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java#L140

sechirkov avatar Aug 04 '21 12:08 sechirkov

Thanks @sechirkov for sharing your findings and the workaround. I will try this setting for my own side project just for my own satisfaction that Kafka Connect works (the way its advertised). Unfortunately for the actual project we have sadly punted on kafka connect and switched to AWS lambda that can also read from kafka topics and write to elastic. Its still in testing phase but so far it has been stable. But I am concerned about finding use cases that may or may not work at scale/concurrency problems that ElasticSearch connector might have already dealt with. Right now we had to write that part of code from scratch.

vikasillumina avatar Aug 08 '21 02:08 vikasillumina

Hey @vikasillumina @sechirkov sharing some details as well.

I am using the Elasticsearch sink connector v13.1.0 to replicate some documents to ES (key-value store use case), and I also noticed some data consistency issues. However, I suspect it was caused by updates getting written out of order as opposed to messages being dropped.

I am using the following configuration:

{
    "write.method": "UPSERT",
    "flush.synchronously": "false" //default,
    "max.retries": "5" //default, 
    "max.in.flight.requests": "5" //default
     ....
} 

// Single task, single worker Docs read that "For the key-value store use case, it supports using keys from Kafka messages as document IDs in Elasticsearch and provides configurations ensuring that updates to a key are written to Elasticsearch in order. For both use cases, Elasticsearch’s idempotent write semantics guarantees exactly once delivery."

The data inconsistency occurred after an outage in our Elastic cluster; during the outage, the connector retried requests several times and the container restarted several times as it kept running into unrecoverable exceptions.

It appears that for the connector to support UPSERTs by ID and guarantee in-order, exactly once delivery we need `"max.in.flight.requests" : "1"

jeguiguren-cohere avatar May 31 '23 03:05 jeguiguren-cohere

@jeguiguren-cohere thanks for reaching out and sharing details of your findings. Did these settings helped to make sure none of the events are lost? Also I would recommend testing with large load like us (10M events) to see if it really works. If you don't mind sharing all of your settings for kafka connect, I am just curious at this point. As originally mentioned earlier, we had to give up on Kafka connect and we wrote our own lambda from scratch and it works really well. But I can see other use cases for kafka connect, its felt wasteful to have to write lambda code dealing with all the issues that Kafka connect for elastic search had already solved. :)

vikasillumina avatar May 31 '23 05:05 vikasillumina