airflow
airflow copied to clipboard
Fix Pod Number Exception in the sync mode if reattach_on_restart parameter is False
Prevent KubernetesPodOperator from raising an exception in a rare scenario, wherein the task is running in the sync mode with parameter reattach_on_restart
equal to False, and the first task attempt fails because the task process is killed externally by the Kubernetes cluster or another process.
If the task is killed externally, it breaks the execution flow (including any try/except blocks) and immediately exists the task, resulting in a situation where the pod created for the first task run try is not properly deleted / updated, and consequently in the pod number exception, which will repeat in the next task tries until the dag will fail completely.
Behavior before the fix:
-
KubernetesPodOperator
starts a new task. - A k8s pod is created to process the task.
- For some reason the task in the pod is killed externally and exits with some code (-9 for example).
- Since the
reattach_on_restart
parameter is set to False, the operator does not try to restart the task in the same pod for the next attempt, and tries to create a new one while the original pod still exists with the same labels. - The new pod is created.
- Before continuing the task,
KubernetesPodOperator
tries to find the pod using the pod labels stored in the task context. - 2 pods with such labels are found, resulting in the exception ("More than one pod running with labels").
- The exception continues to be raised on the next tries.
Behavior after the fix:
1-6. Same behavior.
7. 2 pods with such labels are found.
9. If reattach_on_restart
is False, then we loop through the pods and pick the one that was created last and assign it to be used for the next attempt.
10. We will update the labels of the previous pod and, depending on the value of the on_finish_action
parameter, either keep or remove it.
11. The task will continue without the exception.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst
or {issue_number}.significant.rst
, in newsfragments.
Incorrect use of find_pod is causing this problem. Use the create_pod response object V1Pod for further operations instead of calling the find_pod. And cleanup the existing pods before starting the new pod.
@jedcunningham, @hussein-awala WDYT
@dirrao It does not seem to me that cleanup()
was designed to be run in the beginning of the execute_sync()
method, given the many checks it contains. Should I refactor it or just create another method and call it at the start of execute_sync()
?
Also, in the case with reattach_on_restart=False
we still need to run find_pod()
to actually find out that there is an extra pod left from a previous task attempt and then update its labels before calling cleanup()
or something with a similar functionality. Otherwise it won't be called at all.
@jedcunningham @hussein-awala could you take a look please?
Hi @potiuk @hussein-awala ! Can you please check changes one more time? Thanks!
@eladkal , this fix is kinda urgent, if it will be merged soon, can we please also include this one to google-provider release that you said we will have this weekend with changes for AutoML? Thank you : )
Tests are failing. You will need to add test_exceptions.py
I think we have similar convention in docker provider / other providers so you can check the tests we have there for reference