airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Fix race condition in KubernetesExecutor with concurrently running schedulers

Open droppoint opened this issue 2 years ago • 16 comments

Closes: #32928

A race condition occurs in the _adopt_completed_pods function when schedulers are running concurrently. _adopt_completed_pods function doesn't keep track of which scheduler went down so it constantly tries to adopt completed pods from normally working schedulers. On Airflow setups with concurrently running schedulers and with a lot of short living DAG's it leads to race condition and open slots leak. You can find detailed analysis of this situation in GitHub issue here (https://github.com/apache/airflow/issues/32928#issuecomment-1820413530) Even if one of the schedulers went down and left some pods in completed state it is not a big deal because:

  1. Completed pods do not consume resources from kubernetes cluster
  2. They can be deleted by airflow cleanup-pods CLI command which you can execute with the cronjob.

Co-authored-by: Vlad Pastushenko [email protected]

droppoint avatar Nov 22 '23 15:11 droppoint

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst) Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits. Apache Airflow is a community-driven project and together we are making it better 🚀. In case of doubts contact the developers at: Mailing List: [email protected] Slack: https://s.apache.org/airflow-slack

boring-cyborg[bot] avatar Nov 22 '23 15:11 boring-cyborg[bot]

Ok so the fix is, don't try to adopt completed pods.

You mention that not big consequence if we don't do this because we can periodically delete completed pods in cron.

But what about task state? If pod succeeds, and its scheduler dies at roughly the same time, then will the pod appear running forever? What happens?

dstandish avatar Nov 22 '23 17:11 dstandish

The completed pods are not a problem because they consume no resources, and they will be deleted during the airflow cleanup-pods cronjob execution. However, a TaskInstance can get stuck in the running state until the timeout. I'm not sure about this, so I'll run some experiments on my Airflow setup and come back with more data.

droppoint avatar Nov 23 '23 12:11 droppoint

In case it is important to adopt pods, but we need a fix to do it safely, just throwing out some ideas before i disappear for holiday.

The scheduler can know which other schedulers are live. So, perhaps each loop it could look up active schedulers. Then when looking at the pod, it can see the scheduler job Id (i think it's a label e.g. queued_by or something) and then it can not adopt pods by active schedulers.

Or perhaps more safe would be to queue up the "adoption candidates" in a list and then after done with reading all the pods, and finding candidates, at that moment look up active scheduler IDs and then not adopt candidates that have an active scheduler.

Another idea would be to have the scheduler periodically patch pods with a timestamp and then we could look at that when adopting pods and not adopt one unless hasn't been touched in long time.

Last idea: only adopt completed pods that completed more than 5 minutes ago. Maybe this is the simplest.

All of these have performance implications which need to be considered.

dstandish avatar Nov 23 '23 15:11 dstandish

The completed pods are not a problem because they consume no resources, and they will be deleted during the airflow cleanup-pods cronjob execution. However, a TaskInstance can get stuck in the running state until the timeout. I'm not sure about this, so I'll run some experiments on my Airflow setup and come back with more data.

Hi @droppoint let us know what you find

dstandish avatar Nov 28 '23 16:11 dstandish

Hi @droppoint let us know what you find

yeah. I might even some time to follow the discussion and contribute to discussion as well soon.

potiuk avatar Nov 29 '23 02:11 potiuk

Hi @droppoint let us know what you find

My team and I ran an experiment that demonstrated that even if the scheduler shuts down abnormally, the TaskInstance still completes normally. This observation also applies to DagRun and LocalTaskJob of this TaskInstance. TaskInstance completes normally because state of it changes within the _run_raw_task function from within the worker pod.

Here's a step-by-step breakdown of our experiment: 0. Set the number of schedulers in the namespace to 2.

  1. Create a DAG that sleeps for 5 minutes.
  2. Set orphaned_tasks_check_interval to 20 minutes.
  3. Run the DAG on scheduler №1.
  4. Wait until DAGRun/Job/TaskInstance/Pod is in the "Running" state.
  5. Kill scheduler №1 and prevent its restart.
  6. Wait until the pod is in the Completed state.
  7. Wait until adoption starts on scheduler №2.
  8. Wait until the cleanup-pods cronjob starts.

Results:

  • TaskInstance/DAGRun/Job status changed to "success" after step 6 but before step 7.
  • The pod was deleted only after step 8.

So, pods that were completed after a scheduler's abnormal shutdown do not lead to TaskInstance/DagRun/Job failure, even if they were not "adopted." While the pod was deleted after step 8 by the cleanup-pods cronjob, I understand the concern raised by @JCoder01 that we need to clean up pods properly even in this case. In the next step, we'll attempt to implement a new version of the _adopt_completed_pods function that retrieves IDs of working SchedulerJobs and deletes all pods in the Completed state that don't belong to "running" SchedulerJobs, as @dstandish suggested. We'll test this solution on our Airflow setup and provide more information approximately next week.

droppoint avatar Nov 30 '23 14:11 droppoint

Nice findings. Looks promising. Thanks for that.

Ok here's another scenario.

The task OOMs and therefore cannot report its state by itself.

dstandish avatar Nov 30 '23 16:11 dstandish

We've refactored the _adopt_completed_pods function to the _delete_orphaned_completed_pods function and now it removes completed pods from failed schedulers properly.

Here's a step-by-step breakdown of our test:

  1. Set the number of schedulers in the namespace to 2.
  2. Create a DAG that sleeps for 5 minutes.
  3. Set orphaned_tasks_check_interval to 20 minutes.
  4. Run the DAG on scheduler №1.
  5. Wait until DAGRun/Job/TaskInstance/Pod is in the "Running" state.
  6. Kill scheduler №1 and prevent its restart.
  7. Wait until the pod is in the Completed state.
  8. Wait until adoption starts on scheduler №2.
  9. Wait until the cleanup-pods cronjob starts.

Results:

  • TaskInstance/DAGRun/Job status changed to "success" after step 7 but before step 8.
  • The pod was deleted after step 8 but before step 9.

The task OOMs and therefore cannot report its state by itself.

We'll check that in a few days

droppoint avatar Dec 07 '23 15:12 droppoint

Hey @hussein-awala @dstandish -> would love to get that one merged, maybe you two can take a look. I will just be relasing also cncf.k8s provider I think, and I think together with #35675 we have a chance to have a much more robust K8s Executor :)

potiuk avatar Dec 20 '23 15:12 potiuk

Nice findings. Looks promising. Thanks for that.

Ok here's another scenario.

The task OOMs and therefore cannot report its state by itself.

I believe in this scenario the pod might be moved to failed state. the failed event is generated, and then task state is updated to failed and pod deletion by scheduler.

@droppoint its worth to capture all the scenarios and ensure its works for every scenario.

dirrao avatar Dec 20 '23 17:12 dirrao

@droppoint can you address @hussein-awala comment? It looks like this PR is almost complete. I hope to get it merged for next release

eladkal avatar Jan 16 '24 08:01 eladkal

Are there any other open questions on this one?

dstandish avatar Feb 26 '24 22:02 dstandish

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar Apr 14 '24 00:04 github-actions[bot]

Can anyone loot into this PR? In Airflow 2.8.x users face Executor leak issue and I faced the same. Could the above PR fix it? https://github.com/apache/airflow/issues/36998 https://github.com/apache/airflow/issues/38968 cc @paramjeet01

changqian9 avatar Apr 14 '24 07:04 changqian9

@changqian9 it looks like orig author may have gotten busy or lost motivation. anyone could pick it up and see it through if motivated.

dstandish avatar Apr 25 '24 18:04 dstandish