charts icon indicating copy to clipboard operation
charts copied to clipboard

task-aware celery worker autoscaling (+ `pod-deletion-cost`)

Open thesuperzapper opened this issue 4 years ago • 6 comments

The chart currently supports primitive autoscaling for celery workers, using HorizontalPodAutoscalers with memory metrics. But this is very flawed, as there is not necessarily a link between RAM usage, and the number of pending tasks, meaning you could have a situation where your workers don't scale up despite having pending tasks.

We can make a task-aware autoscaler that will scale up the number of celery workers when there are not enough task slots, and scale down when there are too many.

In past, scale down was dangerous to use with airflow workers, as Kubernetes had no way to influence which Pods were removed, meaning Kubernetes often removes a busy worker where there are workers that are doing nothing.

As of Kubernetes 1.22, there is a beta annotation for Pods managed by ReplicaSets called controller.kubernetes.io/pod-deletion-cost, which tells Kubernetes how "expensive" killing a particular Pod is when decreasing the replicas count.

NOTE: Previously we considered using KEDA (https://github.com/airflow-helm/charts/issues/103) to manage autoscaling, but this will not work with controller.kubernetes.io/pod-deletion-cost, as the HorizontalPodAutoscaler created by KEDA can not patch the required annotations BEFORE scaling down.


Our Celery Worker Autoscaler can perform the following loop:

  1. Cleanup from any past loops:
    1. Remove any controller.kubernetes.io/pod-deletion-cost annotations
      • NOTE: there will only be dangling annotations if Kubernetes did not remove our "chosen" Pods, or if the autoscaler crashed halfway through a loop
      • NOTE: we need to attempt to prevent multiple instances of our autoscaler running at a time
    2. Send each worker Pod that we removed an annotation from an app.control.add_consumer() command, so it resumes picking up new airflow tasks
  2. Calculate the ideal number of worker replicas for the current task load:
    • if the load factor of workers is above A for B time --> increase replicas to meet the target load factor
    • if the load factor of workers is below X for Y time --> decrease replicas to meet the target load factor
      • NOTE: the load factor is the number of available task slots which are consumed
      • NOTE: we should put some limit on the number of scaling decisions per A seconds (to prevent a yo-yo effect), (perhaps have separate limits for down and up to allow faster upscaling)
      • NOTE: we should have a "scaling algorithm" config, even if we only start with 1
      • NOTE: we should have minium and maximum replicas configs
      • NOTE: if using CeleryKubernetesExecutor, we must exclude tasks that are in the AIRFLOW__CELERY_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE
  3. If replicas are going to be decreased by N:
    1. Sort the worker pods by their pod-deletion-cost in ascending order
      • NOTE: the pod-deletion-cost is the number of running tasks, weighted by the total running time of each task (so long-running tasks are not needlessly evicted), specifically we want smaller numbers of long-running tasks to be weighted higher than larger numbers of short-running tasks
      • NOTE: add a DAG/Task label which will prevent any worker running it from being killed (or allow a "weighting" per Task)
    2. Annotate the N worker Pods with the lowest cost Pods with the controller.kubernetes.io/pod-deletion-cost annotation
      • NOTE: if there are pods in a Pending/Unready state, we can reduce N by this number, as Kubernetes will remove these pods first
    3. Send each worker Pod that was annotated an app.control.cancel_consumer(...) command, so does not pick up new airflow tasks after being "marked" for deletion
    4. Patch the replicas down by N

Important changes to make this work:

  • We will need to use a Deployment for the workers(rather than a StatefulSet), as controller.kubernetes.io/pod-deletion-cost is only for Pods in ReplicaSets
  • Because controller.kubernetes.io/pod-deletion-cost is alpha in 1.21 and beta in 1.22, for older Kubernetes versions we can let users use the CloneSet from the CNCF project called OpenKruise (instead of Deployment), as they have back-ported the controller.kubernetes.io/pod-deletion-cost annotation.

thesuperzapper avatar Jul 30 '21 09:07 thesuperzapper

Interesting approach 👀

potiuk avatar Sep 24 '22 00:09 potiuk

Cant wait to see this in action. please let us know once this is available.

NitinKeshavB avatar Jun 30 '23 12:06 NitinKeshavB

Cant wait to see this in action. please let us know once this is available.

@NitinKeshavB I agree, I am sorry it's taken so long!

I actually have a mostly working prototype, but I have paused work on it until I can get the first release of deployKF (a new open-source ML Platform for Kubernetes, which will include Airflow) out the door.

After that, it is top of my list!

thesuperzapper avatar Jul 01 '23 03:07 thesuperzapper

Ping :D would this support scale to 0 by any chance?

brtkwr avatar Dec 14 '23 10:12 brtkwr

Hi @thesuperzapper, I'm very interested in this feature as well, and I see that you recently added a new Kubernetes proposal related to controller.kubernetes.io/pod-deletion-cost. I don't fully grasp the details, but will that change this approach as well? Perhaps more pertinently, will the implementation of this approach depend on the implementation of that Kubernetes proposal?

lexey-e-shelf avatar Feb 28 '24 04:02 lexey-e-shelf