kafka-connect-elasticsearch
kafka-connect-elasticsearch copied to clipboard
Fix: use external versioning for deletes
Problem
If Elasticsearch indexer is highly concurrent, keys are used as ids, and indexer is set to delete records on null values, then not using external versioning for delete requests might corrupt the data: records that should not be deleted end up being deleted.
Solution
Use "version_type" "external" for deletes and as a version number use topic offset.
Does this solution apply anywhere else?
- [ ] yes
- [x] no
Test Strategy
Create an ElasticsearchSinkTask with these properties:
props.put(ElasticsearchSinkConnectorConfig.KEY_IGNORE_CONFIG, "false");
props.put(ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG, Integer.toString(numOfRecords));
props.put(ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG, "1");
props.put(ElasticsearchSinkConnectorConfig.LINGER_MS_CONFIG, "1");
props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, "delete");
Then produce a sequence of records with the same key where around half of the records are null, half of them are not null with a version number, and as the last record put a document with the highest version number. Start indexing and expect that in the Elasticsearch index there is exactly one document with the "message" equal to numOfRecords.
Note:
To observe the previous faulty behavior one needs to comment the code which sets the external version for the delete request and run the integration test introduced in this PR several times, the results should be flaky.
Testing done:
- [x] Unit tests
- [ ] Integration tests
- [ ] System tests
- [ ] Manual tests
Release Plan
It is safe to release and backport the code.
It looks like @dainiusjocas hasn't signed our Contributor License Agreement, yet.
The purpose of a CLA is to ensure that the guardian of a project's outputs has the necessary ownership or grants of rights over all contributions to allow them to distribute under the chosen licence. Wikipedia
You can read and sign our full Contributor License Agreement here.
Once you've signed reply with [clabot:check] to prove it.
Appreciation of efforts,
clabot
[clabot:check]
@confluentinc It looks like @dainiusjocas just signed our Contributor License Agreement. :+1:
Always at your service,
clabot
A blog post that explains why this PR is a bug fix: https://www.jocas.lt/blog/post/kc_es_data_consistency/
test this please
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.
What is the status of this change and what prevent it from moving forward? The issue (if still exists) is concerning
@sp-gupta Would you be able to comment here? Or at least get someone who could?
Thanks!
@sp-gupta Would you be able to comment here? Or at least get someone who could?
Thanks!
The issue was fixed with other changes a couple of years ago.