kafka-connect-elasticsearch icon indicating copy to clipboard operation
kafka-connect-elasticsearch copied to clipboard

Fix: use external versioning for deletes

Open dainiusjocas opened this issue 5 years ago • 5 comments
trafficstars

Problem

If Elasticsearch indexer is highly concurrent, keys are used as ids, and indexer is set to delete records on null values, then not using external versioning for delete requests might corrupt the data: records that should not be deleted end up being deleted.

Solution

Use "version_type" "external" for deletes and as a version number use topic offset.

Does this solution apply anywhere else?
  • [ ] yes
  • [x] no

Test Strategy

Create an ElasticsearchSinkTask with these properties:

    props.put(ElasticsearchSinkConnectorConfig.KEY_IGNORE_CONFIG, "false");
    props.put(ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG, Integer.toString(numOfRecords));
    props.put(ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG, "1");
    props.put(ElasticsearchSinkConnectorConfig.LINGER_MS_CONFIG, "1");
    props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, "delete");

Then produce a sequence of records with the same key where around half of the records are null, half of them are not null with a version number, and as the last record put a document with the highest version number. Start indexing and expect that in the Elasticsearch index there is exactly one document with the "message" equal to numOfRecords.

Note:

To observe the previous faulty behavior one needs to comment the code which sets the external version for the delete request and run the integration test introduced in this PR several times, the results should be flaky.

Testing done:
  • [x] Unit tests
  • [ ] Integration tests
  • [ ] System tests
  • [ ] Manual tests

Release Plan

It is safe to release and backport the code.

dainiusjocas avatar Jun 30 '20 19:06 dainiusjocas

It looks like @dainiusjocas hasn't signed our Contributor License Agreement, yet.

The purpose of a CLA is to ensure that the guardian of a project's outputs has the necessary ownership or grants of rights over all contributions to allow them to distribute under the chosen licence. Wikipedia

You can read and sign our full Contributor License Agreement here.

Once you've signed reply with [clabot:check] to prove it.

Appreciation of efforts,

clabot

ghost avatar Jun 30 '20 19:06 ghost

[clabot:check]

dainiusjocas avatar Jun 30 '20 19:06 dainiusjocas

@confluentinc It looks like @dainiusjocas just signed our Contributor License Agreement. :+1:

Always at your service,

clabot

ghost avatar Jun 30 '20 19:06 ghost

A blog post that explains why this PR is a bug fix: https://www.jocas.lt/blog/post/kc_es_data_consistency/

dainiusjocas avatar Oct 03 '20 10:10 dainiusjocas

test this please

dainiusjocas avatar Oct 26 '20 12:10 dainiusjocas

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

cla-assistant[bot] avatar Sep 11 '23 09:09 cla-assistant[bot]

What is the status of this change and what prevent it from moving forward? The issue (if still exists) is concerning

yeikel avatar Dec 06 '23 15:12 yeikel

@sp-gupta Would you be able to comment here? Or at least get someone who could?

Thanks!

yeikel avatar Dec 06 '23 15:12 yeikel

@sp-gupta Would you be able to comment here? Or at least get someone who could?

Thanks!

The issue was fixed with other changes a couple of years ago.

dainiusjocas avatar Dec 08 '23 20:12 dainiusjocas