[FLINK-28626][tests]Fix unstable RescaleCheckpointManuallyITCase
What is the purpose of the change
Refactor RescaleCheckpointManuallyITCase.
Brief change log
- Modify
NotifyingDefiniteKeySourceto wait for checkpoints to complete. - Make job fail artificially, to retain completed checkpoint.
- Modify the logic of emitting elements in
SubtaskIndexFlatMapper.
Verifying this change
Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
This change is already covered by existing tests, such as RescaleCheckpointManuallyITCase.
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with
@Public(Evolving): (no) - The serializers: (yes / no / don't know)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
- The S3 file system connector: (no)
Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
Hi @Myasuka, this test has always been unstable due to the way we obtained checkpoints, I refactored it. Could you please take a look?
CI report:
- 08d3d48c487a9863ced346da03530d43bd518de9 UNKNOWN
- 5b5f1ea5f4f5a1d8092b7558a8c323fb034ddfa3 Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
Thanks a lot for reviewing.
I still not get the idea why the test is unstable. Could you describe it more clearly?
According to the stack trace in FLINK-27162 and FLINK-28626, the reason for the test failure is FileNotFoundException. This is because:
triggerCheckpoint()is unstable, it might cause "checkpoint expired before completing", which can refer to the exception stack trace in FLINK-28529.- The checkpoint obtained by
getMostRecentCompletedCheckpointmay be incompleted when unaligned checkpoint is enabled, we can see that allFileNotFoundExceptionare thrown when reading files undertaskowned.
I replaced the way to obtain checkpoints by waiting notifyCheckpointComplete(), to avoid using triggerCheckpoint().
Thanks a lot for reviewing, squashed.
do we also need to cherry pick the changes to release-1.15 branch?
No, from FLINK-21321, this test(1bf45b25791cc3fad8b7d0d863caa9b0eef9a87b) was only merged into master.
Hi @Myasuka, could you please help merge this PR?