kafka-connect-elasticsearch
kafka-connect-elasticsearch copied to clipboard
Empty string key results in unusable connector
We bumped into this issue by accident, assuming that we were passing through a null-keyed message.
The connector code - possibly in combination with the configuration property drop.invalid.message - takes good care of this use-case.
Instead, we passed a message with a "" (empty string) key. This resulted in unexpected behaviour: Elasticsearch gets stuck in a continuous index-fail-rewind loop.
In io.confluent.connect.elasticsearch.DataConverter.convertKey a ConnectException("Key is used as document id and can not be null.") is thrown only when the key is null. An empty string key passes through and the BulkRequest bounces back from ElasticSearch resulting in an infinite loop and a running but unusable connector; it is no longer capable of processing other messages. Even worse, the topic becomes unsuited to be processed by any ElasticSinkConnector, and would need to be streamed - 'transforming' the key - to another topic for an ElasticSinkConnector to work on it.
Passing "" keys is of course not wise practice. It results in a tricky situation as the loop is hard to detect because the connector continues to run. An easy fix is to check on empty/blank string in DataConverter.convertKey.
A work-around could be to filter/prepare/error-log messages using an SMT, but I don't see an SMT that allows one to set a field value (or key in this case) based on some condition.
Any suggestions?
The logs:
ERROR Failed to execute batch 9 of 1 records after total of 6 attempt(s) (io.confluent.connect.elasticsearch.bulk.BulkProcessor) connect | org.apache.kafka.connect.errors.ConnectException: Bulk request failed: {"root_cause":[{"type":"action_request_validation_exception","reason":"Validation Failed: 1: an id must be provided if version type or value are set;"}],"type":"action_request_validation_exception","reason":"Validation Failed: 1: an id must be provided if version type or value are set;"} connect | at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.execute(BulkProcessor.java:438) connect | at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:389) connect | at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:375) connect | at java.util.concurrent.FutureTask.run(FutureTask.java:266) connect | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) connect | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) connect | at java.lang.Thread.run(Thread.java:748) connect | at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkProcessorThread.run(BulkProcessor.java:370) connect | [2020-05-19 03:10:40,497] ERROR WorkerSinkTask{id=es-0} Offset commit failed, rewinding to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask) connect | org.apache.kafka.connect.errors.ConnectException: Bulk request failed: {"root_cause":[{"type":"action_request_validation_exception","reason":"Validation Failed: 1: an id must be provided if version type or value are set;"}],"type":"action_request_validation_exception","reason":"Validation Failed: 1: an id must be provided if version type or value are set;"} connect | at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.execute(BulkProcessor.java:438) connect | at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:389) connect | at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:375) connect | at java.util.concurrent.FutureTask.run(FutureTask.java:266) connect | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) connect | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) connect | at java.lang.Thread.run(Thread.java:748) connect | at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkProcessorThread.run(BulkProcessor.java:370) connect | [2020-05-19 03:10:40,497] INFO [Consumer clientId=connector-consumer-es-0, groupId=connect-es] Seeking to offset 2 for partition foo3-0 (org.apache.kafka.clients.consumer.KafkaConsumer) connect | [2020-05-19 03:10:40,498] ERROR WorkerSinkTask{id=es-0} Commit of offsets threw an unexpected exception for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask) connect | org.apache.kafka.connect.errors.ConnectException: Bulk request failed: {"root_cause":[{"type":"action_request_validation_exception","reason":"Validation Failed: 1: an id must be provided if version type or value are set;"}],"type":"action_request_validation_exception","reason":"Validation Failed: 1: an id must be provided if version type or value are set;"} connect | at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.execute(BulkProcessor.java:438) connect | at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:389) connect | at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:375) connect | at java.util.concurrent.FutureTask.run(FutureTask.java:266) connect | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) connect | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) connect | at java.lang.Thread.run(Thread.java:748) connect | at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkProcessorThread.run(BulkProcessor.java:370) connect | [2020-05-19 03:10:40,577] ERROR Can't convert record from topic/partition/offset foo3/0/2. Error message: Key is used as document id and can not be null. (io.confluent.connect.elasticsearch.ElasticsearchWriter) connect | [2020-05-19 03:10:40,577] ERROR Can't convert record from topic/partition/offset foo3/0/3. Error message: Bulk request failed: {"root_cause":[{"type":"action_request_validation_exception","reason":"Validation Failed: 1: an id must be provided if version type or value are set;"}],"type":"action_request_validation_exception","reason":"Validation Failed: 1: an id must be provided if version type or value are set;"} (io.confluent.connect.elasticsearch.ElasticsearchWriter)
Forgot to add the config: { "name": "es", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"schema.ignore": true,
"topics": "foo3",
"connection.url": "http://es:9200",
"type.name": "_doc",
"drop.invalid.message": true,
"behavior.on.null.values": "delete"
} }
+1, facing similar issue with mongo CDC connector
I hit the same problem, I hoped that errors.tolerance=all
, would skip the errors, but it didn't.
This can completely block the connector at any time. Not very production friendly.
I found a workaround - it is possible to filter out records with null keys using this jmespath plugin together with org.apache.kafka.connect.transforms.Filter. It allows making conditions on string primitives too - something not possible with io.confluent.connect.transforms.Filter.
Here is my example config:
{
"name": "sink",
"config":
{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch:9200",
"tasks.max": "1",
"type.name": "_doc",
"topics": "targetTopic",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"schema.ignore": "true",
"key.ignore": "false",
"transforms": "DropNullKey",
"transforms.DropNullKey.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.DropNullKey.predicate": "NullKey",
"predicates": "NullKey",
"predicates.NullKey.type": "de.denisw.kafka.connect.jmespath.MatchesJMESPath$Key",
"predicates.NullKey.query": "@ == null"
}
}