flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-27570][runtime] Fix checkFailureCounter in CheckpointFailureManager for finalize failure

Open JesseAtSZ opened this issue 2 years ago • 42 comments

What is the purpose of the change

This pull request is related to FLINK-27570.

When checkpoints are stored on CheckpointCoordinator using the FileSystemCheckpointStorage, initializeBaseLocationsForCheckpoint may failed to initialize sharedStateDirectory and taskOwnedStateDirectory.

However, no further exception handling is done here. Even if the initializeBaseLocationsForCheckpoint fails, the job can still be started successfully.

In addition, errors will be reported at each subsequent checkpoint: Failure to finalize checkpoint.

Finally, because FINALIZE_CHECKPOINT_FAILURE is ignored at checkFailureCounter and doesn't participate in the statistics of tolerableCpFailureNumber, the checkpoint may fail all the time, but the user cannot perceive it.

Brief change log

Count FINALIZE_CHECKPOINT_FAILURE as a real failure in CheckpointFailureManager.

Abort the checkpoint for operators using union list state through CHECKPOINT_DECLINED_TASK_CLOSING, if parts of the subtasks have finished.

Check whether the sharedStateDirectory and the taskOwnedStateDirectory are created successfully for FileSystemCheckpointStorage on CheckpointCoordinator.

Verifying this change

This change added tests and can be verified as follows:

  • Added test case to verify FINALIZE_CHECKPOINT_FAILURE can eventually fails the job.
  • Manually verified the change by use an invalid checkpoint path.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes)
  • The S3 file system connector: (no)
  • Has specific attention for Checkpointing by @curcur

Documentation

  • Does this pull request introduce a new feature? (no)

JesseAtSZ avatar Jun 28 '22 11:06 JesseAtSZ

CI report:

  • c9981b5e6c44b73b44cbbb163523b78f2375b484 Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Jun 28 '22 11:06 flinkbot

Thanks for creating this PR @JesseAtSZ. IIUC, you expect an IOException when the checkpoint directory exists(fileSystem.mkdir() return false)? I think it's a little wired, could you please explain your motivation?

fredia avatar Jul 01 '22 03:07 fredia

@fredia In my example, I specify the path of checkpoint in CentOS as /D/, fileSystem.mkdirs() will return false, and the file creation fails, so the checkpoint initialize fails. In addition, errors will be reported at each subsequent checkpoint: Failure to finalize checkpoint. Although I specified the tolerableCpFailureNumber parameter,FINALIZE_CHECKPOINT_FAILURE is ignored at checkFailureCounter() and does not participate in the statistics of tolerableCpFailureNumber, So as the job progresses, the checkpoint will always fail, but the user cannot perceive it.

JesseAtSZ avatar Jul 01 '22 04:07 JesseAtSZ

@JesseAtSZ Thanks for your explanation, fileSystem.mkdirs() not only returns false when the directory exists, but also returns false when there is no permission to write. This PR makes sense to me.

fredia avatar Jul 01 '22 04:07 fredia

@fredia I updated the PR, could you please take a review again?

JesseAtSZ avatar Jul 01 '22 12:07 JesseAtSZ

@rkhachatryan could you please take a review?

fredia avatar Jul 04 '22 02:07 fredia

Thanks for the PR @JesseAtSZ ,

I'm trying to understand why checkpoints are failing with FINALIZE_CHECKPOINT_FAILURE (which is ignored by CheckpointFailureManager) and not something like IOException. From the code, it might happen only in CheckpointCoordinator - when all the tasks have already acknowleged the checkpoint. That probably means that the job is stateless. Could you confirm that @JesseAtSZ ?

If that's NOT the case then we probably should fix failure counting first.

Another question is related to the TM - do we need a symmetric check there? (in FsCheckpointStorageAccess.resolveCheckpointStorageLocation).

rkhachatryan avatar Jul 04 '22 08:07 rkhachatryan

@MartijnVisser Thank for the review! could you please take a review again? In addition, I would like to ask about our code version management. Is it allowed to merge code into the released version, such as 1.14.4? I found this problem on 1.14.4.

JesseAtSZ avatar Jul 04 '22 08:07 JesseAtSZ

Thank for the review! could you please take a review again?

Sure, but @rkhachatryan his comments are more important here :)

Is it allowed to merge code into the released version, such as 1.14.4? I found this problem on 1.14.4.

Code will always be released with a new version, so that would be Flink 1.14.5 if it would be merged and a release would be created later.

MartijnVisser avatar Jul 04 '22 08:07 MartijnVisser

@MartijnVisser So this means that this problem cannot be fixed in the old version unless you modify the code and compile it yourself, right?

JesseAtSZ avatar Jul 04 '22 08:07 JesseAtSZ

So this means that this problem cannot be fixed in the old version unless you modify the code and compile it yourself, right?

Correct yes

MartijnVisser avatar Jul 04 '22 08:07 MartijnVisser

@MartijnVisser I found that some branches end in the string -rc. What does this mean?

JesseAtSZ avatar Jul 04 '22 08:07 JesseAtSZ

I found that some branches end in the string -rc. What does this mean?

Release candidates, those are created prior to a release being released

MartijnVisser avatar Jul 04 '22 08:07 MartijnVisser

Thanks for the PR @JesseAtSZ ,

I'm trying to understand why checkpoints are failing with FINALIZE_CHECKPOINT_FAILURE (which is ignored by CheckpointFailureManager) and not something like IOException. From the code, it might happen only in CheckpointCoordinator - when all the tasks have already acknowleged the checkpoint. That probably means that the job is stateless. Could you confirm that @JesseAtSZ ?

If that's NOT the case then we probably should fix failure counting first.

Another question is related to the TM - do we need a symmetric check there? (in FsCheckpointStorageAccess.resolveCheckpointStorageLocation).

@rkhachatryan FINALIZE_CHECKPOINT_FAILURE only occurs in the checkpoint coordinator. I don't think it is necessary to check the path on TM, because the initialization on Coordinator may before the performCheckpoint on TM.

JesseAtSZ avatar Jul 04 '22 12:07 JesseAtSZ

Thanks @JesseAtSZ, Could you please confirm that the job is stateless?

the initialization on Coordinator will before the performCheckpoint on TM.

Not always - if a default location was used by JM (I'm not saying the check on TM should be added, just clarifying).

rkhachatryan avatar Jul 04 '22 13:07 rkhachatryan

@rkhachatryan The job you mentioned stateless, does it mean that whether state is used in in my job? FLINK-27570 was first found in a job with state, later, I constructed a demo that didn't use state, which also has the same problem.

JesseAtSZ avatar Jul 04 '22 14:07 JesseAtSZ

Got it, thanks. My concern is that probably we should fix the error handling in CheckpointFailureManager, instead of mkdirs call:

    case FINALIZE_CHECKPOINT_FAILURE:
        // ignore
        break;

The documentation for execution.checkpointing.tolerable-failed-checkpoints says that it applies to any IOException on the Job Manager. This is the case here (IOException gets wrapped into CheckpointException when I try it locally).

By adding a check of mkdirs, we do fix this particular problem; but any other IOException (e.g. intermittent failure when writing the _metadata file) will be ignored by CheckpointFailureManager.

So how about counting FINALIZE_CHECKPOINT_FAILURE as a real failure in CheckpointFailureManager? We could still fail fast if the directories could not be created, but that would be just an optimization then.

rkhachatryan avatar Jul 04 '22 21:07 rkhachatryan

@rkhachatryan That's also my question when reading the code. I'm curious about why this exception was ignored. I don't know the original intention of ignoring this exception before (nor in the comments), so I choose to modify it in initializeBaseLocationsForCheckpoint. If you are sure to modify it like this, I can update the PR.

JesseAtSZ avatar Jul 05 '22 01:07 JesseAtSZ

Got it, thanks. My concern is that probably we should fix the error handling in CheckpointFailureManager, instead of mkdirs call:

    case FINALIZE_CHECKPOINT_FAILURE:
        // ignore
        break;

The documentation for execution.checkpointing.tolerable-failed-checkpoints says that it applies to any IOException on the Job Manager. This is the case here (IOException gets wrapped into CheckpointException when I try it locally).

By adding a check of mkdirs, we do fix this particular problem; but any other IOException (e.g. intermittent failure when writing the _metadata file) will be ignored by CheckpointFailureManager.

So how about counting FINALIZE_CHECKPOINT_FAILURE as a real failure in CheckpointFailureManager? We could still fail fast if the directories could not be created, but that would be just an optimization then.

@pnowojski Do you have an idea why FINALIZE_CHECKPOINT_FAILURE is not counted as a checkpoint failure? Looks strange to me as well.

If no strong reason, I go with @rkhachatryan , we should count FINALIZE_CHECKPOINT_FAILURE as well.

curcur avatar Jul 05 '22 03:07 curcur

The initial set of ignored failure reasons was not well thought through and has been changed/adjusted over and over again, once we found a good example that a failure shouldn't be ignored. So:

Do you have an idea why FINALIZE_CHECKPOINT_FAILURE is not counted as a checkpoint failure? Looks strange to me as well.

There is probably no good reason why.

If no strong reason, I go with @rkhachatryan , we should count FINALIZE_CHECKPOINT_FAILURE as well.

👍 seems like a good change. Looking at the other failures FINALIZE_CHECKPOINT_FAILURE fits much better with things like:

            case IO_EXCEPTION:
            case CHECKPOINT_ASYNC_EXCEPTION:
            case CHECKPOINT_DECLINED:
            case CHECKPOINT_EXPIRED:

Rather than checkpoint declined/subsumed. Just remember noting this change in the release notes in this case, as this would be slightly changing behaviour of the system.

I have similar doubts if maybe TRIGGER_CHECKPOINT_FAILURE shouldn't be ignored as well?

pnowojski avatar Jul 05 '22 10:07 pnowojski

Regarding TRIGGER_CHECKPOINT_FAILURE,

I had the following concerns - but after checking the code they turned out to be wrong: If counted as failure:

  • "not all tasks are running" could cause it - wrong: there is a separate NOT_ALL_REQUIRED_TASKS_RUNNING constant
  • timing issues or existing concurrent checkpoints can produce it - wrong: they are separate constants as well

If not counted as failure (left as is):

  • if we check mkdirs return value then it will cause TRIGGER_CHECKPOINT_FAILURE, again breaking the counter - wrong: mkdirs is called during the CheckpointCoordinator initialization

Still, I'm leaning towards leaving it as is because there are probably other edge cases when it happens and shouldn't be counted as a failure, e.g. shutdown.

I don't see any failure reasons that we should change handling of.

rkhachatryan avatar Jul 05 '22 20:07 rkhachatryan

I'm fine with that

pnowojski avatar Jul 06 '22 10:07 pnowojski

@rkhachatryan @MartijnVisser @curcur @pnowojski @fredia I updated the PR.

JesseAtSZ avatar Jul 06 '22 11:07 JesseAtSZ

@rkhachatryan

I'm wondering whether this check is sufficient. mkdirs javadoc says: @return true if at least one new directory has been created, false otherwise

The mkdirs(final Path f) comment in LocalFileSystem mentions that:

Recursively creates the directory specified by the provided path.

I think "at least one" means during the recursive creation of folders, multiple folders may be generated according to the folder hierarchy, so at least one directory will be generated.

In addition, Run e2e tests and Test - table have failed all the time, what‘s’ the possible reason? It seems unrelated to this change.

JesseAtSZ avatar Jul 07 '22 11:07 JesseAtSZ

I've checked it locally and found that WindowDistinctAggregateITCase fails with Exceeded checkpoint tolerable failure threshold, which is preceeded by

 Caused by: org.apache.flink.util.FlinkRuntimeException: The vertex GlobalWindowAggregate[28] -> Calc[29] -> LocalWindowAggregate[30] (id = 6853e3e e87fc960bbdfeb6fc8a232136) has used UnionListState, but part of its tasks has called operators' finish method.
         at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlan.checkNoPartlyOperatorsFinishedVertexUsedUnionListState(DefaultCheckpointPlan. java:187) ~[classes/:?]
         at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlan.fulfillFinishedTaskStatus(DefaultCheckpointPlan.java:138) ~[classes/:?]
         at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:323) ~[classes/:?]
         at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1356) ~[classes/:?]

@gaoyunhaii Do you know what may be the reason?

It seems unrelated to this change, so I'd open a ticket for investigation and temporarily increase TolerableCheckpointFailureNumber in the affected tests, WDYT?

rkhachatryan avatar Jul 07 '22 14:07 rkhachatryan

@gaoyunhaii Do you know what may be the reason?

This might be an expected behavior, I'll have a double look at the test~

gaoyunhaii avatar Jul 08 '22 08:07 gaoyunhaii

@gaoyunhaii did you have a chance to look at the test?

@JesseAtSZ right now it's only a single test failure, which is likely related to a particular case (finished sources). So I think it makes sense to investigate it.

rkhachatryan avatar Jul 13 '22 13:07 rkhachatryan

@gaoyunhaii did you have a chance to look at the test?

Hi @rkhachatryan very sorry for the delay, I'll have a conclusion for the issue today~

gaoyunhaii avatar Jul 14 '22 07:07 gaoyunhaii

Hi @rkhachatryan very sorry for the delay, I have a check with the logic here, the cause of the failure is that

  1. For final checkpoint mechanism we has some specialization treatment for operators using union list state that we will abort the checkpoint if parts of the subtasks have finished. This avoid the possible state loss and data inconsistency for union list state.
  2. The checking happens in finalize step, in this step, it would check all the operators using union list state, if parts of its subtasks finished, an exception will be thrown and the checkpoint will be failed with FINALIZE_CHECKPOINT_FAILURE
  3. Since in this PR we instead count FINALIZE_CHECKPOINT_FAILURE as explicit failures, thus some tests will be affected.

Since the failures with union list state is currently a by-design behavior, I tend to we distinguish this case with other finalize errors: for this case we could fail the checkpoint with a dedicated reason and continue to not counting them.

gaoyunhaii avatar Jul 14 '22 08:07 gaoyunhaii

Thanks a lot for your analysis @gaoyunhaii!

I think we can distinguish these cases in different ways:

  1. checkNoPartlyOperatorsFinishedVertexUsedUnionListState returns a result value (e.g. boolean) which then is checked in call hierarchy
  2. checkNoPartlyOperatorsFinishedVertexUsedUnionListState throws a CheckpointException with a new reason (and not wrapping in CheckpointCoordinator.finalizeCheckpoint)
  3. checkNoPartlyOperatorsFinishedVertexUsedUnionListState throws a new type of exception; then in CheckpointCoordinator.finalizeCheckpoint we use a different failure reason for this type of exception

(1) seems the cleanest way but requires more changes (4 calls) (2) might have some unanticipated consequences because of changing the exception type (3) seems a good compromise between invasiveness and cleanliness

So I'd implement (3) or (1), WDYT? @JesseAtSZ , @gaoyunhaii

rkhachatryan avatar Jul 14 '22 10:07 rkhachatryan