flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging

Open 1996fanrui opened this issue 1 year ago • 2 comments

What is the purpose of the change

The concurrent exceptions doesn't work.

Brief change log

  • [FLINK-33565][Exception] Archive exceptions into the exception history immediately when they occur, instead of archiving them when restarting
    • The first commit is refactoring.
    • Currently, archiving exception when restarting task instead of immediately. It means, when one task failure, we can see the exception history after flink restart this task.
    • So the first commit is only a refactoring. It archives exceptions into the exception history immediately when they occur, instead of archiving them when restarting.
  • [FLINK-33565][Exception] Restart strategy checks whether current failure is a new attempt
    • The second commit is related to restart strategy, adding a return value indicates whether current failure is a new attempt.
  • [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging
    • The third commit is core solution of this JIRA:
    • If it's a new attempt, it's root exception. and it will be the latest root exception.
    • If it's not a new attempt, it will be a concurrentException and it will be added to the latest RootException.
  • [FLINK-33565][Scheduler] Correct the numberOfRestarts metric
    • numberOfRestarts should be increased when exception is a new attempt
  • [FLINK-33565][Scheduler][refactor] Remove the wrong @Nullable annotation of FailureHandlingResult due to the constructor checks verticesToRestart is not null

Verifying this change

  • Improve some tests of ExecutionFailureHandlerTest, and added new test: testNewAttemptAndNumberOfRestarts
  • Improve some old tests of ExponentialDelayRestartBackoffTimeStrategyTest to check whether isNewAttempt is expected
  • Improve some tests of FailureHandlingResultTest to test isNewAttempt are true and false
  • Improve the DefaultSchedulerTest#testExceptionHistoryConcurrentRestart
  • Check the isNewAttempt is expected in FailureHandlingResultSnapshotTest
  • Test addConcurrentExceptions in RootExceptionHistoryEntryTest

Test manually

I added a job demo with 6 regions, all tasks will fail when processing the first record, this demo job can be run directly. Here is the result, we can see all failed tasks in the WebUI.

image

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper:no
  • The S3 file system connector:no

Documentation

  • Does this pull request introduce a new feature? no

1996fanrui avatar Dec 28 '23 03:12 1996fanrui

CI report:

  • 794923631ddf2be0c11756359e5dfeab10ed46ff Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Dec 28 '23 03:12 flinkbot

Thanks for your efforts @1996fanrui. Overall, looks good. I can do a proper review after you've finalized the PR. I just added a few nitty comments. PTAL

Many thanks @XComp for the review! If the solution is fine, I will address your comments and finish the tests as soon as possible!

1996fanrui avatar Jan 08 '24 14:01 1996fanrui

Hi @XComp , sorry for bother you again due to the 1.19 will be freeze next week.

1996fanrui avatar Jan 17 '24 08:01 1996fanrui

@flinkbot run azure

1996fanrui avatar Jan 19 '24 02:01 1996fanrui

LGTM 👍 Thanks for your efforts, @1996fanrui, and for keeping up with my nitty comments. I enjoyed the discussions in this PR.

Thanks @XComp for the patient review, I definitely learned some skills from your comments. I enjoyed the discussions as well.

About the commits: We can reorganize them a bit, I feel. You could keep 6fe8037 and f422804 as separate hotfix commits (using the [hotfix] prefix rather than the Jira issue) because they improve the code base and we would want to keep them even if we decide to revert FLINK-33565 (for whatever reason) in the future. WDYT?

The idea makes sense to me. 👍🏻

I update the commit message for the last commit, but I didn't updated it for the first commit due to some reasons:

  1. The first commit changed the behavior
    • Before this PR, archiving exception when restarting task instead of immediately.
    • It means, when one task failure, we can see the exception history after flink restart this task.
    • The first commit archives exceptions into the exception history immediately when they occur, instead of archiving them when restarting.
  2. I hope the first commit is fine, but I'm afraid we missed somethings , so it's not a simple refactor or hotfix.

WDYT?

1996fanrui avatar Jan 22 '24 10:01 1996fanrui

Fair enough. What about squashing the FLINK-33565-related commits into a single one?

XComp avatar Jan 22 '24 12:01 XComp

Fair enough. What about squashing the FLINK-33565-related commits into a single one?

Hi @XComp , it's fine for me. Let me explain why I split them into multiple commits:

  1. First of all, the doc of Flink code style introduces us that it's better to separate refactoring, cleanup and independent-changes as the separate commits. As I understand, it has a series of advantages:
  • Easy to review
  • Easy to revert
  • Easy to cherry-pick
    • In my daily work, we always cherry-pick some new features(our users needed) into our internal flink version, because a lot of flink jobs upgrade the flink version needs a lot of efforts.
    • During cherry-pick, I found let the independent-changes as the separate commit is very useful.
    • For example, we need to cherry pick featureB, and featureA is merged before featureB.
    • FeatureB was using some common changes of FeatureA, FeatureA added some new fields in some common classes.(It's not refactor, but the change isn't huge.)
    • If FeatureA think it's minor change and keep all changed into 1 commit. We must cherry-pick featureA during cherry-pick featureB.
    • If featureA split them into multiple commits for each independent-changes, we can cherry-pick the common class changes of featureA, and the whole featureB.
  1. Currently, the first commit changed a minor behavior, and it's similar refactor. And it's mentioned in the last comment: https://github.com/apache/flink/pull/24003#issuecomment-1903758592
  2. The second commit, it's [FLINK-33565][Exception] Restart strategy checks whether current failure is a new attempt.
    • The concurrent exceptions depend on the result of RestartBackoffTimeStrategy.
    • Other features might depend on it as well in the future, so I prefer it's a independent-change
  3. The third commit is core change of this feature.
  4. The fourth commit is that fixing the numberOfRestarts metic, it may be a ndependent-change as well.

That's why I split them into 4 commits.

As I said before, squashing them into one commit is fine for me. Looking forward to your feedback, thanks~

[1] https://flink.apache.org/how-to-contribute/code-style-and-quality-pull-requests/#3-separate-refactoring-cleanup-and-independent-changes

1996fanrui avatar Jan 23 '24 03:01 1996fanrui

ok, fine with me :+1:

XComp avatar Jan 23 '24 09:01 XComp

Thanks @XComp for the patient review and a series of great suggestions again! Merging~

A good experience~

1996fanrui avatar Jan 23 '24 10:01 1996fanrui