flink-connector-kafka icon indicating copy to clipboard operation
flink-connector-kafka copied to clipboard

[FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

Open hhktseng opened this issue 2 years ago • 8 comments

What is the purpose of the change

To address the current flow of KafkaProducer having exact same behavior for DeliveryGuarantee.NONE and DeliveryGuarantee.AT_LEAST_ONCE

It is based on the understanding that the existing flush performed on producer via prepareSnapshotPreBarrier and the actual checkpoint completion on commit has a very rare race condition where there could be data being invoked via processElement after the PreBarrier flush, and if KafkaProducer is having retry on a batched data that has yet thrown any error, upon job failure (caused by broker) will cause the batched data to never be committed, and since checkpoint was successful, these data will be lost.

This PR address the issue by enabling AT_LEAST_ONCE to have an opportunity to flush again when commit is happening when needed to avoid this issue. This is to ensure at the end of the checkpoint cycle, producer will definitely have no data left in its buffer.

Please comment or verify on the above understanding.

Brief change log

  • *add variable hasRecordsInBuffer to FlinkKafkaInternalProducer and will be set when send/flush/close are called
  • *add variable transactional to KafkaCommittable to track whether a committable is transactional or not
  • *add new constructor to KafkaCommittable for Unit Test backward compatibility
  • *have prepareCommit also return list of commitable for DeliveryGuarantee.AT_LEAST_ONCE in prepareCommit()
  • *have KafkaCommitter check the new transactional value on KafkaCommittable to perform commitTransaction() to preserve original EXACTLY_ONCE pathway

Verifying this change

This change added tests and can be verified as follows:

  • Added second flush test to test this particular special case (where producer.send was invoked after a flush)
  • Manually verified the job runs correctly with this change in existing cluster

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

hhktseng avatar Nov 30 '23 23:11 hhktseng

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

boring-cyborg[bot] avatar Nov 30 '23 23:11 boring-cyborg[bot]

@hhktseng Can you rebase your PR?

MartijnVisser avatar Jan 17 '24 10:01 MartijnVisser

@hhktseng Can you rebase your PR?

@MartijnVisser can you point me to which commit to rebase onto?

thanks

hhktseng avatar Jan 19 '24 07:01 hhktseng

@MartijnVisser can you point me to which commit to rebase onto?

@hhktseng On the latest changes from main please

MartijnVisser avatar Jan 19 '24 07:01 MartijnVisser

created patch and applied after syncing to latest commit, then replaced forked branch with latest sync + patch

hhktseng avatar Jan 23 '24 18:01 hhktseng

@hhktseng were you able to test that this change mitigates your original issue? Is there a way to repro in the tests?

mas-chen avatar Apr 08 '24 18:04 mas-chen

@hhktseng thanks for working on this. Can you please address the review comments?

tweise avatar Jul 04 '24 16:07 tweise

Please check my comment here. https://issues.apache.org/jira/browse/FLINK-33545?focusedCommentId=17863737&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17863737 If everyone is sure that the current fix is addressing the actual issue, please go ahead.

AHeise avatar Jul 08 '24 10:07 AHeise