kafka-connect-elasticsearch icon indicating copy to clipboard operation
kafka-connect-elasticsearch copied to clipboard

ES Document version is set to Kafka record offset

Open michaelfortin opened this issue 4 years ago • 6 comments

connector version: 11.0.0

When using "key.ignore": false and "write.method": "insert", the connector set the document version (version appearing in Elasticsearch) to the record offset in Kafka, for a reason that I cannot quite understand.

It sometimes leads to version conflict error and the message is not indexed in elasticsearch

[2021-02-12 02:18:07,133] WARN Ignoring version conflicts for items: [Key{index-000042/_doc/1355b1bc-2de2-4591-922a-890407bf3308}, Key{index-000042/_doc/c3d9f18e-91ab-49f0-9892-ffbd928dd783}, Key{index-000042/_doc/81928de3-0f77-4e22-a6f0-fd24088dceb6}, Key{index-000042/_doc/a5733fef-e8a6-4193-89cc-ef9b0f9c39d5}, Key{index-000042/_doc/f04a6eaa-a4d2-4707-b1ba-7f696a887251}, Key{index-000042/_doc/f2aa9c5c-52f3-4435-92b4-e7f77fe8a748}, Key{index-000042/_doc/610ab60f-2045-4238-83e9-e19991b7d732}, Key{index-000042/_doc/599d8291-bde2-4fd3-91a1-34706591f04b}, Key{index-000042/_doc/3767cc22-e894-4614-8e98-07de4becd170}, Key{index-000042/_doc/4ec40fff-707b-40f8-a308-d184be29b9e7}, Key{index-000042/_doc/78e113b1-f2fe-4348-9e99-afc926eb4caa}, Key{index-000042/_doc/9078d606-5280-47f0-9590-ae274693874e}, Key{index-000042/_doc/cd23d612-8013-4890-90df-d7b4a17bc80c}, Key{index-000042/_doc/4b34770f-6304-4beb-942d-dd98cf875ede}, Key{index-000042/_doc/bd2e8889-5597-4fa2-a210-c7483518ca27}, Key{index-000042/_doc/aabdbd33-742e-49bb-adef-cdc3d4f00b91}, Key{index-000042/_doc/03c52672-d98c-4f13-a4cc-b7d6c9bc6ef5}, Key{index-000042/_doc/1dde610d-2d52-403d-8559-2e83ba1024c5}, Key{index-000042/_doc/94db6a14-26d7-47ea-a80d-9fa41840a954}, Key{index-000042/_doc/07b97061-5842-4e77-b686-20e5af239e59}

This error can happen if the version provided by the connector is lower than the version of the document with the same _id in Elasticsearch. When this happens, this means the record offset in Kafka was lower than the a previous record generating the same _id. This situation is possible if the Kafka topic has multiple partitions, if partitions are created on the fly or if the partitions are deleted and are created again. In my case it happens quite often and it affects the connector performance.

The connector should have a new boolean parameter called version.ignore . When true, the connector would not attempt to add a version to the document under any circumstances. When false (default), the current behavior would take place.

The new config

  public static final String IGNORE_VERSION_CONFIG = "version.ignore";
  private static final String IGNORE_VERSION_DOC =
      "Whether to ignore the record offset when forming the Elasticsearch document version."
          + " When this is set to ``true``, no document version will be provided to Elasticsearch"
          + " When this is set to ``false``"
          + " with ``" + IGNORE_KEY_CONFIG + "`` set to ``false``"
          + " and ``" + WRITE_METHOD_CONFIG + "`` set to ``" + WriteMethod.INSERT + "``,"
          + " the version provided will be the record offset in Kafka";
  private static final String IGNORE_VERSION_DISPLAY = "Ignore version mode";
  private static final boolean IGNORE_VERSION_DEFAULT = false;

Its usage

  private DocWriteRequest<?> maybeAddExternalVersioning(
      DocWriteRequest<?> request,
      SinkRecord record
  ) {
    if (!config.ignoreVersion()) {
      if (!config.shouldIgnoreKey(record.topic())) {
        request.versionType(VersionType.EXTERNAL);
        request.version(record.kafkaOffset());
      }
    }
    return request;
  }

I built a new version this way and it works fine. How can I create a pull request?

michaelfortin avatar Feb 23 '21 21:02 michaelfortin

I implemented the solution in my code and it works like a charm. It saved my life. Thanks a lot for this @michaelfortin.

jacobb35 avatar Mar 25 '21 15:03 jacobb35

I also face the same issue. When can we expect a fix for this

Harish-Sridhar avatar Apr 07 '21 10:04 Harish-Sridhar

Hi, I have written an PR for this feature and tested the package locally. This is the PR: https://github.com/confluentinc/kafka-connect-elasticsearch/pull/628

yzia2000 avatar Apr 16 '22 09:04 yzia2000

What can be done to get this PR merged?

ChrisIgel avatar Feb 16 '23 16:02 ChrisIgel

What can be done to get this PR merged?

I will probably have to create a Jira ticket for this to be noticed...

yzia2000 avatar May 19 '23 08:05 yzia2000

+++ For PR to get merged

Mahanmmi avatar Aug 17 '23 17:08 Mahanmmi