[FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging
What is the purpose of the change
The concurrent exceptions doesn't work.
Brief change log
- [FLINK-33565][Exception] Archive exceptions into the exception history immediately when they occur, instead of archiving them when restarting
- The first commit is refactoring.
- Currently, archiving exception when restarting task instead of immediately. It means, when one task failure, we can see the exception history after flink restart this task.
- So the first commit is only a refactoring. It archives exceptions into the exception history immediately when they occur, instead of archiving them when restarting.
- [FLINK-33565][Exception] Restart strategy checks whether current failure is a new attempt
- The second commit is related to restart strategy, adding a return value indicates whether current failure is a new attempt.
- [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging
- The third commit is core solution of this JIRA:
- If it's a new attempt, it's root exception. and it will be the latest root exception.
- If it's not a new attempt, it will be a concurrentException and it will be added to the latest RootException.
- [FLINK-33565][Scheduler] Correct the numberOfRestarts metric
- numberOfRestarts should be increased when exception is a new attempt
- [FLINK-33565][Scheduler][refactor] Remove the wrong
@Nullableannotation of FailureHandlingResult due to the constructor checksverticesToRestartis not null
Verifying this change
- Improve some tests of
ExecutionFailureHandlerTest, and added new test:testNewAttemptAndNumberOfRestarts - Improve some old tests of
ExponentialDelayRestartBackoffTimeStrategyTestto check whetherisNewAttemptis expected - Improve some tests of
FailureHandlingResultTestto testisNewAttemptare true and false - Improve the
DefaultSchedulerTest#testExceptionHistoryConcurrentRestart - Check the
isNewAttemptis expected inFailureHandlingResultSnapshotTest - Test
addConcurrentExceptionsinRootExceptionHistoryEntryTest
Test manually
I added a job demo with 6 regions, all tasks will fail when processing the first record, this demo job can be run directly. Here is the result, we can see all failed tasks in the WebUI.
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
@Public(Evolving): no - The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper:no
- The S3 file system connector:no
Documentation
- Does this pull request introduce a new feature? no
CI report:
- 794923631ddf2be0c11756359e5dfeab10ed46ff Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
Thanks for your efforts @1996fanrui. Overall, looks good. I can do a proper review after you've finalized the PR. I just added a few nitty comments. PTAL
Many thanks @XComp for the review! If the solution is fine, I will address your comments and finish the tests as soon as possible!
Hi @XComp , sorry for bother you again due to the 1.19 will be freeze next week.
@flinkbot run azure
LGTM 👍 Thanks for your efforts, @1996fanrui, and for keeping up with my nitty comments. I enjoyed the discussions in this PR.
Thanks @XComp for the patient review, I definitely learned some skills from your comments. I enjoyed the discussions as well.
About the commits: We can reorganize them a bit, I feel. You could keep 6fe8037 and f422804 as separate hotfix commits (using the
[hotfix]prefix rather than the Jira issue) because they improve the code base and we would want to keep them even if we decide to revert FLINK-33565 (for whatever reason) in the future. WDYT?
The idea makes sense to me. 👍🏻
I update the commit message for the last commit, but I didn't updated it for the first commit due to some reasons:
- The first commit changed the behavior
- Before this PR, archiving exception when restarting task instead of immediately.
- It means, when one task failure, we can see the exception history after flink restart this task.
- The first commit archives exceptions into the exception history immediately when they occur, instead of archiving them when restarting.
- I hope the first commit is fine, but I'm afraid we missed somethings , so it's not a simple refactor or hotfix.
WDYT?
Fair enough. What about squashing the FLINK-33565-related commits into a single one?
Fair enough. What about squashing the FLINK-33565-related commits into a single one?
Hi @XComp , it's fine for me. Let me explain why I split them into multiple commits:
- First of all, the doc of Flink code style introduces us that it's better to separate refactoring, cleanup and independent-changes as the separate commits. As I understand, it has a series of advantages:
- Easy to review
- Easy to revert
- Easy to cherry-pick
- In my daily work, we always cherry-pick some new features(our users needed) into our internal flink version, because a lot of flink jobs upgrade the flink version needs a lot of efforts.
- During cherry-pick, I found let the
independent-changesas the separate commit is very useful. - For example, we need to cherry pick featureB, and featureA is merged before featureB.
- FeatureB was using some common changes of FeatureA, FeatureA added some new fields in some common classes.(It's not refactor, but the change isn't huge.)
- If FeatureA think it's minor change and keep all changed into 1 commit. We must cherry-pick featureA during cherry-pick featureB.
- If featureA split them into multiple commits for each independent-changes, we can cherry-pick the common class changes of featureA, and the whole featureB.
- Currently, the first commit changed a minor behavior, and it's similar refactor. And it's mentioned in the last comment: https://github.com/apache/flink/pull/24003#issuecomment-1903758592
- The second commit, it's
[FLINK-33565][Exception] Restart strategy checks whether current failure is a new attempt.- The concurrent exceptions depend on the result of
RestartBackoffTimeStrategy. - Other features might depend on it as well in the future, so I prefer it's a independent-change
- The concurrent exceptions depend on the result of
- The third commit is core change of this feature.
- The fourth commit is that fixing the
numberOfRestartsmetic, it may be a ndependent-change as well.
That's why I split them into 4 commits.
As I said before, squashing them into one commit is fine for me. Looking forward to your feedback, thanks~
[1] https://flink.apache.org/how-to-contribute/code-style-and-quality-pull-requests/#3-separate-refactoring-cleanup-and-independent-changes
ok, fine with me :+1:
Thanks @XComp for the patient review and a series of great suggestions again! Merging~
A good experience~