kafka-connect-elasticsearch
kafka-connect-elasticsearch copied to clipboard
ES Document version is set to Kafka record offset
connector version: 11.0.0
When using "key.ignore": false and "write.method": "insert", the connector set the document version (version appearing in Elasticsearch) to the record offset in Kafka, for a reason that I cannot quite understand.
It sometimes leads to version conflict error and the message is not indexed in elasticsearch
[2021-02-12 02:18:07,133] WARN Ignoring version conflicts for items: [Key{index-000042/_doc/1355b1bc-2de2-4591-922a-890407bf3308}, Key{index-000042/_doc/c3d9f18e-91ab-49f0-9892-ffbd928dd783}, Key{index-000042/_doc/81928de3-0f77-4e22-a6f0-fd24088dceb6}, Key{index-000042/_doc/a5733fef-e8a6-4193-89cc-ef9b0f9c39d5}, Key{index-000042/_doc/f04a6eaa-a4d2-4707-b1ba-7f696a887251}, Key{index-000042/_doc/f2aa9c5c-52f3-4435-92b4-e7f77fe8a748}, Key{index-000042/_doc/610ab60f-2045-4238-83e9-e19991b7d732}, Key{index-000042/_doc/599d8291-bde2-4fd3-91a1-34706591f04b}, Key{index-000042/_doc/3767cc22-e894-4614-8e98-07de4becd170}, Key{index-000042/_doc/4ec40fff-707b-40f8-a308-d184be29b9e7}, Key{index-000042/_doc/78e113b1-f2fe-4348-9e99-afc926eb4caa}, Key{index-000042/_doc/9078d606-5280-47f0-9590-ae274693874e}, Key{index-000042/_doc/cd23d612-8013-4890-90df-d7b4a17bc80c}, Key{index-000042/_doc/4b34770f-6304-4beb-942d-dd98cf875ede}, Key{index-000042/_doc/bd2e8889-5597-4fa2-a210-c7483518ca27}, Key{index-000042/_doc/aabdbd33-742e-49bb-adef-cdc3d4f00b91}, Key{index-000042/_doc/03c52672-d98c-4f13-a4cc-b7d6c9bc6ef5}, Key{index-000042/_doc/1dde610d-2d52-403d-8559-2e83ba1024c5}, Key{index-000042/_doc/94db6a14-26d7-47ea-a80d-9fa41840a954}, Key{index-000042/_doc/07b97061-5842-4e77-b686-20e5af239e59}
This error can happen if the version provided by the connector is lower than the version of the document with the same _id in Elasticsearch. When this happens, this means the record offset in Kafka was lower than the a previous record generating the same _id. This situation is possible if the Kafka topic has multiple partitions, if partitions are created on the fly or if the partitions are deleted and are created again. In my case it happens quite often and it affects the connector performance.
The connector should have a new boolean parameter called version.ignore . When true, the connector would not attempt to add a version to the document under any circumstances. When false (default), the current behavior would take place.
The new config
public static final String IGNORE_VERSION_CONFIG = "version.ignore";
private static final String IGNORE_VERSION_DOC =
"Whether to ignore the record offset when forming the Elasticsearch document version."
+ " When this is set to ``true``, no document version will be provided to Elasticsearch"
+ " When this is set to ``false``"
+ " with ``" + IGNORE_KEY_CONFIG + "`` set to ``false``"
+ " and ``" + WRITE_METHOD_CONFIG + "`` set to ``" + WriteMethod.INSERT + "``,"
+ " the version provided will be the record offset in Kafka";
private static final String IGNORE_VERSION_DISPLAY = "Ignore version mode";
private static final boolean IGNORE_VERSION_DEFAULT = false;
Its usage
private DocWriteRequest<?> maybeAddExternalVersioning(
DocWriteRequest<?> request,
SinkRecord record
) {
if (!config.ignoreVersion()) {
if (!config.shouldIgnoreKey(record.topic())) {
request.versionType(VersionType.EXTERNAL);
request.version(record.kafkaOffset());
}
}
return request;
}
I built a new version this way and it works fine. How can I create a pull request?
I implemented the solution in my code and it works like a charm. It saved my life. Thanks a lot for this @michaelfortin.
I also face the same issue. When can we expect a fix for this
Hi, I have written an PR for this feature and tested the package locally. This is the PR: https://github.com/confluentinc/kafka-connect-elasticsearch/pull/628
What can be done to get this PR merged?
What can be done to get this PR merged?
I will probably have to create a Jira ticket for this to be noticed...
+++ For PR to get merged