[FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching
What is the purpose of the change
To address the current flow of KafkaProducer having exact same behavior for DeliveryGuarantee.NONE and DeliveryGuarantee.AT_LEAST_ONCE
It is based on the understanding that the existing flush performed on producer via prepareSnapshotPreBarrier and the actual checkpoint completion on commit has a very rare race condition where there could be data being invoked via processElement after the PreBarrier flush, and if KafkaProducer is having retry on a batched data that has yet thrown any error, upon job failure (caused by broker) will cause the batched data to never be committed, and since checkpoint was successful, these data will be lost.
This PR address the issue by enabling AT_LEAST_ONCE to have an opportunity to flush again when commit is happening when needed to avoid this issue. This is to ensure at the end of the checkpoint cycle, producer will definitely have no data left in its buffer.
Please comment or verify on the above understanding.
Brief change log
- *add variable
hasRecordsInBuffertoFlinkKafkaInternalProducerand will be set when send/flush/close are called - *add variable
transactionaltoKafkaCommittableto track whether a committable is transactional or not - *add new constructor to
KafkaCommittablefor Unit Test backward compatibility - *have
prepareCommitalso return list of commitable for DeliveryGuarantee.AT_LEAST_ONCE inprepareCommit() - *have
KafkaCommittercheck the new transactional value onKafkaCommittableto performcommitTransaction()to preserve original EXACTLY_ONCE pathway
Verifying this change
This change added tests and can be verified as follows:
- Added second flush test to test this particular special case (where producer.send was invoked after a flush)
- Manually verified the job runs correctly with this change in existing cluster
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
@Public(Evolving): no - The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
- The S3 file system connector: no
Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable
Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
@hhktseng Can you rebase your PR?
@hhktseng Can you rebase your PR?
@MartijnVisser can you point me to which commit to rebase onto?
thanks
@MartijnVisser can you point me to which commit to rebase onto?
@hhktseng On the latest changes from main please
created patch and applied after syncing to latest commit, then replaced forked branch with latest sync + patch
@hhktseng were you able to test that this change mitigates your original issue? Is there a way to repro in the tests?
@hhktseng thanks for working on this. Can you please address the review comments?
Please check my comment here. https://issues.apache.org/jira/browse/FLINK-33545?focusedCommentId=17863737&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17863737 If everyone is sure that the current fix is addressing the actual issue, please go ahead.