flink-connector-elasticsearch icon indicating copy to clipboard operation
flink-connector-elasticsearch copied to clipboard

[FLINK-38371][Connectors / ElasticSearch] Add `sink.retry-on-conflicts` option to Elasticsearch Sink

Open Myracle opened this issue 2 months ago • 1 comments

This pull request introduces a new configuration option, sink.retry-on-conflicts, for the Elasticsearch table sink. This option allows users to specify the number of times to retry an update request when a version conflict occurs.

In high-throughput streaming scenarios, it's possible for concurrent updates to the same document ID to cause version conflicts in Elasticsearch, leading to data loss. By setting this option, the sink can automatically retry the failed update, making the connector more resilient to such conflicts.

Changes:

  • Added the sink.retry-on-conflicts option to ElasticsearchConnectorOptions. The default value is -1, which disables retries and maintains the previous behavior.
  • The option is propagated through the table factory and dynamic sink to the RowElasticsearchEmitter.
  • The UpdateRequest in RowElasticsearchEmitter is now configured with the specified number of retries on conflict.

Example Usage:

To enable retries, you can configure the option in your DDL:

CREATE TABLE MyElasticsearchTable (
    -- columns
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'http://localhost:9200',
    'index' = 'my-index',
    'sink.retry-on-conflicts' = '3' -- Retry up to 3 times on conflict
);

Myracle avatar Sep 17 '25 11:09 Myracle

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

boring-cyborg[bot] avatar Sep 17 '25 11:09 boring-cyborg[bot]