flink
flink copied to clipboard
[FLINK-27570][runtime] Fix checkFailureCounter in CheckpointFailureManager for finalize failure
What is the purpose of the change
This pull request is related to FLINK-27570.
When checkpoints are stored on CheckpointCoordinator
using the FileSystemCheckpointStorage
, initializeBaseLocationsForCheckpoint
may failed to initialize sharedStateDirectory
and taskOwnedStateDirectory
.
However, no further exception handling is done here. Even if the initializeBaseLocationsForCheckpoint
fails, the job can still be started successfully.
In addition, errors will be reported at each subsequent checkpoint: Failure to finalize checkpoint
.
Finally, because FINALIZE_CHECKPOINT_FAILURE
is ignored at checkFailureCounter
and doesn't participate in the statistics of tolerableCpFailureNumber
, the checkpoint may fail all the time, but the user cannot perceive it.
Brief change log
Count FINALIZE_CHECKPOINT_FAILURE
as a real failure in CheckpointFailureManager
.
Abort the checkpoint for operators using union list state through CHECKPOINT_DECLINED_TASK_CLOSING
, if parts of the subtasks have finished.
Check whether the sharedStateDirectory
and the taskOwnedStateDirectory
are created successfully for FileSystemCheckpointStorage
on CheckpointCoordinator
.
Verifying this change
This change added tests and can be verified as follows:
- Added test case to verify
FINALIZE_CHECKPOINT_FAILURE
can eventually fails the job. - Manually verified the change by use an invalid checkpoint path.
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with
@Public(Evolving)
: (no) - The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes)
- The S3 file system connector: (no)
- Has specific attention for Checkpointing by @curcur
Documentation
- Does this pull request introduce a new feature? (no)
CI report:
- c9981b5e6c44b73b44cbbb163523b78f2375b484 Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:-
@flinkbot run azure
re-run the last Azure build
Thanks for creating this PR @JesseAtSZ. IIUC, you expect an IOException when the checkpoint directory exists(fileSystem.mkdir()
return false
)? I think it's a little wired, could you please explain your motivation?
@fredia In my example, I specify the path of checkpoint in CentOS as /D/, fileSystem.mkdirs() will return false, and the file creation fails, so the checkpoint initialize fails. In addition, errors will be reported at each subsequent checkpoint: Failure to finalize checkpoint. Although I specified the tolerableCpFailureNumber parameter,FINALIZE_CHECKPOINT_FAILURE is ignored at checkFailureCounter() and does not participate in the statistics of tolerableCpFailureNumber, So as the job progresses, the checkpoint will always fail, but the user cannot perceive it.
@JesseAtSZ Thanks for your explanation, fileSystem.mkdirs()
not only returns false
when the directory exists, but also returns false
when there is no permission to write. This PR makes sense to me.
@fredia I updated the PR, could you please take a review again?
@rkhachatryan could you please take a review?
Thanks for the PR @JesseAtSZ ,
I'm trying to understand why checkpoints are failing with FINALIZE_CHECKPOINT_FAILURE
(which is ignored by CheckpointFailureManager
) and not something like IOException
.
From the code, it might happen only in CheckpointCoordinator
- when all the tasks have already acknowleged the checkpoint.
That probably means that the job is stateless.
Could you confirm that @JesseAtSZ ?
If that's NOT the case then we probably should fix failure counting first.
Another question is related to the TM - do we need a symmetric check there? (in
FsCheckpointStorageAccess.resolveCheckpointStorageLocation
).
@MartijnVisser Thank for the review! could you please take a review again? In addition, I would like to ask about our code version management. Is it allowed to merge code into the released version, such as 1.14.4? I found this problem on 1.14.4.
Thank for the review! could you please take a review again?
Sure, but @rkhachatryan his comments are more important here :)
Is it allowed to merge code into the released version, such as 1.14.4? I found this problem on 1.14.4.
Code will always be released with a new version, so that would be Flink 1.14.5 if it would be merged and a release would be created later.
@MartijnVisser So this means that this problem cannot be fixed in the old version unless you modify the code and compile it yourself, right?
So this means that this problem cannot be fixed in the old version unless you modify the code and compile it yourself, right?
Correct yes
@MartijnVisser I found that some branches end in the string -rc. What does this mean?
I found that some branches end in the string -rc. What does this mean?
Release candidates, those are created prior to a release being released
Thanks for the PR @JesseAtSZ ,
I'm trying to understand why checkpoints are failing with
FINALIZE_CHECKPOINT_FAILURE
(which is ignored byCheckpointFailureManager
) and not something likeIOException
. From the code, it might happen only inCheckpointCoordinator
- when all the tasks have already acknowleged the checkpoint. That probably means that the job is stateless. Could you confirm that @JesseAtSZ ?If that's NOT the case then we probably should fix failure counting first.
Another question is related to the TM - do we need a symmetric check there? (in
FsCheckpointStorageAccess.resolveCheckpointStorageLocation
).
@rkhachatryan FINALIZE_CHECKPOINT_FAILURE
only occurs in the checkpoint coordinator. I don't think it is necessary to check the path on TM, because the initialization on Coordinator may before the performCheckpoint
on TM.
Thanks @JesseAtSZ, Could you please confirm that the job is stateless?
the initialization on Coordinator will before the performCheckpoint on TM.
Not always - if a default location was used by JM (I'm not saying the check on TM should be added, just clarifying).
@rkhachatryan The job you mentioned stateless, does it mean that whether state is used in in my job? FLINK-27570 was first found in a job with state, later, I constructed a demo that didn't use state, which also has the same problem.
Got it, thanks. My concern is that probably we should fix the error handling in CheckpointFailureManager
, instead of mkdirs
call:
case FINALIZE_CHECKPOINT_FAILURE:
// ignore
break;
The documentation for execution.checkpointing.tolerable-failed-checkpoints
says that it applies to any IOException on the Job Manager
.
This is the case here (IOException
gets wrapped into CheckpointException
when I try it locally).
By adding a check of mkdirs
, we do fix this particular problem; but any other IOException
(e.g. intermittent failure when writing the _metadata
file) will be ignored by CheckpointFailureManager
.
So how about counting FINALIZE_CHECKPOINT_FAILURE
as a real failure in CheckpointFailureManager
?
We could still fail fast if the directories could not be created, but that would be just an optimization then.
@rkhachatryan That's also my question when reading the code. I'm curious about why this exception was ignored. I don't know the original intention of ignoring this exception before (nor in the comments), so I choose to modify it in initializeBaseLocationsForCheckpoint. If you are sure to modify it like this, I can update the PR.
Got it, thanks. My concern is that probably we should fix the error handling in
CheckpointFailureManager
, instead ofmkdirs
call:case FINALIZE_CHECKPOINT_FAILURE: // ignore break;
The documentation for
execution.checkpointing.tolerable-failed-checkpoints
says that it applies to anyIOException on the Job Manager
. This is the case here (IOException
gets wrapped intoCheckpointException
when I try it locally).By adding a check of
mkdirs
, we do fix this particular problem; but any otherIOException
(e.g. intermittent failure when writing the_metadata
file) will be ignored byCheckpointFailureManager
.So how about counting
FINALIZE_CHECKPOINT_FAILURE
as a real failure inCheckpointFailureManager
? We could still fail fast if the directories could not be created, but that would be just an optimization then.
@pnowojski Do you have an idea why FINALIZE_CHECKPOINT_FAILURE
is not counted as a checkpoint failure? Looks strange to me as well.
If no strong reason, I go with @rkhachatryan , we should count FINALIZE_CHECKPOINT_FAILURE
as well.
The initial set of ignored failure reasons was not well thought through and has been changed/adjusted over and over again, once we found a good example that a failure shouldn't be ignored. So:
Do you have an idea why FINALIZE_CHECKPOINT_FAILURE is not counted as a checkpoint failure? Looks strange to me as well.
There is probably no good reason why.
If no strong reason, I go with @rkhachatryan , we should count FINALIZE_CHECKPOINT_FAILURE as well.
👍 seems like a good change. Looking at the other failures FINALIZE_CHECKPOINT_FAILURE
fits much better with things like:
case IO_EXCEPTION:
case CHECKPOINT_ASYNC_EXCEPTION:
case CHECKPOINT_DECLINED:
case CHECKPOINT_EXPIRED:
Rather than checkpoint declined/subsumed. Just remember noting this change in the release notes in this case, as this would be slightly changing behaviour of the system.
I have similar doubts if maybe TRIGGER_CHECKPOINT_FAILURE
shouldn't be ignored as well?
Regarding TRIGGER_CHECKPOINT_FAILURE
,
I had the following concerns - but after checking the code they turned out to be wrong: If counted as failure:
- "not all tasks are running" could cause it - wrong: there is a separate
NOT_ALL_REQUIRED_TASKS_RUNNING
constant - timing issues or existing concurrent checkpoints can produce it - wrong: they are separate constants as well
If not counted as failure (left as is):
- if we check
mkdirs
return value then it will causeTRIGGER_CHECKPOINT_FAILURE
, again breaking the counter - wrong:mkdirs
is called during theCheckpointCoordinator
initialization
Still, I'm leaning towards leaving it as is because there are probably other edge cases when it happens and shouldn't be counted as a failure, e.g. shutdown.
I don't see any failure reasons that we should change handling of.
I'm fine with that
@rkhachatryan @MartijnVisser @curcur @pnowojski @fredia I updated the PR.
@rkhachatryan
I'm wondering whether this check is sufficient. mkdirs javadoc says: @return true if at least one new directory has been created, false otherwise
The mkdirs(final Path f)
comment in LocalFileSystem
mentions that:
Recursively creates the directory specified by the provided path.
I think "at least one" means during the recursive creation of folders, multiple folders may be generated according to the folder hierarchy, so at least one directory will be generated.
In addition, Run e2e tests
and Test - table
have failed all the time, what‘s’ the possible reason? It seems unrelated to this change.
I've checked it locally and found that WindowDistinctAggregateITCase
fails with Exceeded checkpoint tolerable failure threshold
, which is preceeded by
Caused by: org.apache.flink.util.FlinkRuntimeException: The vertex GlobalWindowAggregate[28] -> Calc[29] -> LocalWindowAggregate[30] (id = 6853e3e e87fc960bbdfeb6fc8a232136) has used UnionListState, but part of its tasks has called operators' finish method.
at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlan.checkNoPartlyOperatorsFinishedVertexUsedUnionListState(DefaultCheckpointPlan. java:187) ~[classes/:?]
at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlan.fulfillFinishedTaskStatus(DefaultCheckpointPlan.java:138) ~[classes/:?]
at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:323) ~[classes/:?]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1356) ~[classes/:?]
@gaoyunhaii Do you know what may be the reason?
It seems unrelated to this change, so I'd open a ticket for investigation and temporarily increase TolerableCheckpointFailureNumber
in the affected tests, WDYT?
@gaoyunhaii Do you know what may be the reason?
This might be an expected behavior, I'll have a double look at the test~
@gaoyunhaii did you have a chance to look at the test?
@JesseAtSZ right now it's only a single test failure, which is likely related to a particular case (finished sources). So I think it makes sense to investigate it.
@gaoyunhaii did you have a chance to look at the test?
Hi @rkhachatryan very sorry for the delay, I'll have a conclusion for the issue today~
Hi @rkhachatryan very sorry for the delay, I have a check with the logic here, the cause of the failure is that
- For final checkpoint mechanism we has some specialization treatment for operators using union list state that we will abort the checkpoint if parts of the subtasks have finished. This avoid the possible state loss and data inconsistency for union list state.
- The checking happens in finalize step, in this step, it would check all the operators using union list state, if parts of its subtasks finished, an exception will be thrown and the checkpoint will be failed with FINALIZE_CHECKPOINT_FAILURE
- Since in this PR we instead count FINALIZE_CHECKPOINT_FAILURE as explicit failures, thus some tests will be affected.
Since the failures with union list state is currently a by-design behavior, I tend to we distinguish this case with other finalize errors: for this case we could fail the checkpoint with a dedicated reason and continue to not counting them.
Thanks a lot for your analysis @gaoyunhaii!
I think we can distinguish these cases in different ways:
-
checkNoPartlyOperatorsFinishedVertexUsedUnionListState
returns a result value (e.g.boolean
) which then is checked in call hierarchy -
checkNoPartlyOperatorsFinishedVertexUsedUnionListState
throws aCheckpointException
with a new reason (and not wrapping inCheckpointCoordinator.finalizeCheckpoint
) -
checkNoPartlyOperatorsFinishedVertexUsedUnionListState
throws a new type of exception; then inCheckpointCoordinator.finalizeCheckpoint
we use a different failure reason for this type of exception
(1) seems the cleanest way but requires more changes (4 calls) (2) might have some unanticipated consequences because of changing the exception type (3) seems a good compromise between invasiveness and cleanliness
So I'd implement (3) or (1), WDYT? @JesseAtSZ , @gaoyunhaii