flink-on-k8s-operator
flink-on-k8s-operator copied to clipboard
Improve update and savepoint handling
Purpose of this PR
Currently, savepoint and its related routines are scattered in several places. It make difficult to enhance this operator now. This PR organizes them so that savepoint-related routines can be improved and extended in the future. It also improves the update, cancel and recovery features that depend on savepoint routines.
Changes
- Make job deploy phase clearly with new job states.
- [x] Organize savepoint routines.
- [x] Fix some savepoint related issues.
- [x] Improve update stability.
- [x] Change the job stop process that is applied when updating and canceling a job.
- [x] Elaborate update/restart strategy more.
Details
-
Organize and fix savepoint routine
- Organize Savepoint handling and related routines in one place
- ~~Add field to configure the latest savepoint age for update~~
- ~~
SavepointMaxAgeForUpdateSeconds
~~
- ~~
- Auto savepoint
- Delete
lastSavepointTriggerTime
andlastSavepointTriggerID
: duplicated withstatus.savepoint
- Change the first trigger to be based on
status.job.startTime
and deleteSavepointTriggerReasonScheduledInitial
- Delete
- Savepoint state
- Add a routine to derive the state from HTTP code
- Get rid of operator's own savepoint timeout error
-
Change job stop behavior when updating and cancelling a job
- From version 1.9, the stop API that supports exactly-once semantics was introduced, but for compatibility up to version 1.8, "cancel with savepoint" will be applied first. In the future, add the flinkVersion field and support "stop with savepoint" in 1.9 or higher.
- [x] Apply "cancel with savepoint" API
-
Improve update process
-
takeSavepointOnUpdate
field- Rename
takeSavepointOnUpgrade
totakeSavepointOnUpdate
- Improve savepoint skip routine when updating with
takeSavepointOnUpdate
- Fix #408
- Rename
- Improve update stability
-
-
Elaborate update/restart strategy Limit job state age from which job can be restarted when auto restarting from failure, updating stopped job and updating running job wiith
takeSavepointOnUpdate
false- [x] Add a field to limit maximum savepoint age to restore for job on restart.
-
MaxStateAgeToRestoreSeconds
-
- [x] Add a field to limit maximum savepoint age to restore for job on restart.
-
Add new job deployment states
- Deploying, DeployFailed, Restarting
- Deploying, DeployFailed, Restarting
Thanks for the PRs! Let me know when it is ready for review.
@elanv Thank you for doing this! you are awesome :) I opened an issue #427 , I think its the root cause for my mitigation where I introduced some of the fields you are removing here, I will appreciate your feedback there too.
Also, I see that in the takeSavepointOnUpgrade flow I added (nice rename BTW) it takes the SP synchronously. And I saw that while it happends the operator fails to submit new jobclsuters (it just waits for the SP to finish then it submits them) Do you think its something we can change as well? or it proves to difficult ATM?
last thing, do you thinks its possible to leverage the new SavepointMaxAgeToRestoreSeconds (or a new field) to make the job submitter fail if not SP is provided for it? (To reduce the chance for an error in deployment where a job that must start from a SP does not)
I plan to write additional unit test code while doing enough tests. The diagram of the new job states applied to new commit is also attached to the PR description. @shashken Thanks for the comments. I will organize my thoughts and answer later.
@shashken Sorry for late response.
The savepoint issue is specific to the auto savepoint feature. When auto savepoint is enabled, as soon as job starts the operator triggers a savepoint, but the savepoint fails if some tasks of the Flink job did not started. In that case, the savepoint status is updated as failed, and immediatley the next iteration of the reconciliation loop starts again because the status change triggers it. But in the following iterations, it is likely that the previous failing routine is still repeated because there wasn't enough time between the iterations. Basically the savepoint should not be triggered as soon as job starts, therefore, I have fixed it in this PR to trigger the first savepoint after provided time interval passed.
I do not understand still in which case takeSavepointOnUpgrade
is needed. When a Flink job is updated, it is expected that the job resumes from its last state. But when takeSavepointOnUpgrade
is false, I'm not sure where the job should be restored. Could you explain more about the field?
MaxStateAgeToRestoreSeconds
is for the auto restarting from failure or updating job from stopped states. With the field, Flink operator will restart the Flink job only when the recorded savepoint met the age condition. Does it fit your intention too?
@elanv I am making a PR for the concurrent reconcile calls. will mention you there to take a look once its ready.
Regarding the takeSavepointOnUpgrade
flag, we need to take care of 3 situations:
- When a job fails or gets updated, what is the acceptable age for the state to prevent human errors/bugs (
MaxStateAgeToRestoreSeconds
covers it perfectly ✔️ ) - When a job is updated when do we trigger a new SP? - which is not covered by
MaxStateAgeToRestoreSeconds
(for example, what if we say we trigger a SP every 24h, and its acceptable to restore from 24h- SP, but when we update the job if the SP is 23h old we probably would like to trigger a new one) Maybe another argument can cover this case? - When a job is upgraded, the default should be to take a fresh SP if needed (as mentioned in the point above) but sometimes, the user might not want to wait for a SP to be created, so an option to disable the SP creation on upgrade would be nice (the first 2 cases are more important IMO)
@functicons Could you review this PR? I think completed this work almost. Now, it would be nice to add the documentation and tests while proceeding with the review.
@shashken I have assumed that the job must be resumed from its last state when update is triggered . So I thought takeSavepointOnUpdate == true
means savepoint must be taken always before job update. Assuming on that, it seems that the second issue can be resolved with takeSavepointOnUpdate == false
and autoSavepointSeconds == 3600
. If it is not appropriate to take savepoint so often, IMO, it seem efficient to take savepoint on update time with takeSavepointOnUpdate == true
. If it is necessary to support use case just like the issue, the update strategy need to be elaborated more.
And in the last commit, I applied maxStateAgeToRestoreSeconds
to the case takeSavepointOnUpdate == false
also, to prevent updating running job with too old savepoint.