distributed icon indicating copy to clipboard operation
distributed copied to clipboard

Expose workers_to_close as a HTTP route

Open psontag opened this issue 2 years ago • 9 comments

With #6270 we now have a first set of HTTP routes that are exposed on the scheduler. Currently there are two modes for the /api/v1/retire_workers route:

  1. Retire n workers based on the workers_to_close method of the scheduler
  2. Retire the provided workers based on their address

But there is no API yet that exposes the workers_to_close method directly. At least in our use case that would be really helpful though since our graceful shutdown implementation has two phases:

  1. Retrieve workers to close from the scheduler and do some custom graceful shutdown logic
  2. Actually make the retire_worker call to the scheduler.

Background

We have implemented some custom shutdown logic for Dask workers on Kubernetes. When we need to scale down the number of Dask workers we query the scheduler for which workers we should shutdown. These workers are then marked for termination (via an annotation on the deployment object of the worker) and then retired at the scheduler. After a configured time the worker deployments are then deleted.

When we retire the workers at the scheduler we set the close_workers option to False otherwise the workers would be restarted by Kubernetes and try to reconnect to the scheduler. But this also means we have to have a way to clean up all the retired workers, hence the annotation on the deployments. We also don't want to set the annotations after we made the retire worker call to avoid leaving behind any zombie workers that are not useable anymore.

@jacobtomlinson

psontag avatar May 23 '22 09:05 psontag

I have no issue exposing the workers_to_close method.

I would be interested to dig more into your shutdown procedure though, I know we've discussed this before but I'm still curious if there is a different approach we could take.

When we call retire_workers the scheduler handles the graceful shutdown and the worker process exits. Ideally this should allow the Kubernetes Pod to go into a Completed phase. I've noticed while working on the dask-kubernetes operator that it does not and the pod is restarted, but the scheduler does not allow the worker to reconnect because it has been retired, the pod seems to stay running and just never rejoins the Dask cluster. At the moment we are just nuking the pod as soon as the retire_workers call comes back which isn't especially graceful.

I'm keen to try and fix things so the worker pods go to Completed once they retire. I see value in leaving worker pods as completed as the logs will always be accessible. Due to the ownership hooks these will be removed when the DaskCluster resource is deleted anyway. This is similar to the way a Job leaved the pod in a Completed state.

Do you see any issues with this approach? I remember you saying there was a reason why you had to put your Pods in a Deployment, why was that?

jacobtomlinson avatar May 23 '22 09:05 jacobtomlinson

When we call retire_workers the scheduler handles the graceful shutdown and the worker process exits. Ideally this should allow the Kubernetes Pod to go into a Completed phase. I've noticed while working on the dask-kubernetes operator that it does not and the pod is restarted, but the scheduler does not allow the worker to reconnect because it has been retired, the pod seems to stay running and just never rejoins the Dask cluster.

This is most likely related to the container restart policy. By default it is set to Always. We haven't experimented with the other options yet but I am not sure if they are that useful in this scenario.

At the moment we are just nuking the pod as soon as the retire_workers call comes back which isn't especially graceful.

Couldn't this take a long time? I am not sure if it is a good idea to wait that long in the kopf handler. Generally it is recommended to keep them short.

I'm keen to try and fix things so the worker pods go to Completed once they retire. I see value in leaving worker pods as completed as the logs will always be accessible.

In our case this is not a problem since we have an observability stack that collects all the pod logs. But I see that his could be valuable for an open source operator.

Due to the ownership hooks these will be removed when the DaskCluster resource is deleted anyway. This is similar to the way a Job leaved the pod in a Completed state.

In our use-cases the Dask resource generally don't get deleted. They keep being used periodically for computations.

Since we are using istio every worker pod consists of two containers (worker and istio-proxy). The istio-proxy currently has no way to detect that the worker container has shutdown down and is completed. This means it will run indefinitely (see https://github.com/istio/istio/issues/6324). There are some workarounds available but they all require custom logic.

So to avoid having these zombie workers we add an annotation to the worker deployments and delete them after some time.

psontag avatar May 23 '22 11:05 psontag

This is most likely related to the container restart policy. By default it is set to Always. We haven't experimented with the other options yet but I am not sure if they are that useful in this scenario.

Ah yeah! I was thinking of the OnFailure functionality.

Couldn't this take a long time? I am not sure if it is a good idea to wait that long in the kopf handler. Generally it is recommended to keep them short.

The plan is for the API call to return immediately. So the way we are nuking them right now is premature and we need to fix that. Ideally, we call the API and then at some later time the pod will go into a Completed state and requires no further interaction from the operator.

In our use-cases the Dask resource generally don't get deleted. They keep being used periodically for computations.

It's interesting that you reuse clusters. Is this because cluster start time is prohibitive? Or just a workflow that feels natural? I typically think of Dask clusters as ephemeral runtimes that you conjure up when you need one.

Since we are using istio every worker pod consists of two containers (worker and istio-proxy). The istio-proxy currently has no way to detect that the worker container has shutdown down and is completed. This means it will run indefinitely (see https://github.com/istio/istio/issues/6324). There are some workarounds available but they all require custom logic.

I guess this is the core of the problem. There seems to be a lot of discussion in Kubernetes on how to get sidecars to behave correctly in Job style resources. This KEP seems like a good solution but is not going to exist for some time https://github.com/kubernetes/enhancements/issues/2872.

One workaround could be for the operator to watch all worker pods and if the worker container has completed and handles closing the rest of the containers. This decouples the cleanup from the retirement process and could easily be removed in the future. This is similar to your deployment annotation but would be based on pod and container statuses instead.

jacobtomlinson avatar May 24 '22 09:05 jacobtomlinson

The plan is for the API call to return immediately. So the way we are nuking them right now is premature and we need to fix that. Ideally, we call the API and then at some later time the pod will go into a Completed state and requires no further interaction from the operator.

We would like to limit the time the worker is allowed to take to retire. Is there any way to do this via dask? Since our Kubernetes clusters are shared across multiple teams we might need to downscale certain Dask workloads to ensure other workloads also get a fair share of the available resources. So the shutdown process of the worker should not take much longer than 60-90s.

It's interesting that you reuse clusters. Is this because cluster start time is prohibitive? Or just a workflow that feels natural? I typically think of Dask clusters as ephemeral runtimes that you conjure up when you need one.

I think with the internal platform we build its the natural workflow that developed over time. In general we scale down dask clusters that are inactive to free up resources for others or disable them (but usually they don't get deleted). But for production workloads it can be important to have all the workers already to ensure we hit our SLAs.

One workaround could be for the operator to watch all worker pods and if the worker container has completed and handles closing the rest of the containers. This decouples the cleanup from the retirement process and could easily be removed in the future. This is similar to your deployment annotation but would be based on pod and container statuses instead.

This is indeed an option that we already use in a different operator that we have for Kubernetes Jobs that are part of the istio mesh.

Another reason for why we use deployments for the workers is that we don't have to babysit the worker pods. Pods can die for a number of reasons (eviction, node died etc.). With deployments these pods will automatically be recreated. If we create the pods ourselves via the operator we would have to keep track of the currently running workers and make sure it corresponds to the desired workers.

psontag avatar May 25 '22 09:05 psontag

I don't think we support a timeout when retiring workers today.

Thanks for the other info, lots to think about.

jacobtomlinson avatar May 25 '22 11:05 jacobtomlinson

When we call retire_workers the scheduler handles the graceful shutdown and the worker process exits. Ideally this should allow the Kubernetes Pod to go into a Completed phase. I've noticed while working on the dask-kubernetes operator that it does not and the pod is restarted, but the scheduler does not allow the worker to reconnect because it has been retired, the pod seems to stay running and just never rejoins the Dask cluster.

This is most likely related to the container restart policy. By default it is set to Always. We haven't experimented with the other options yet but I am not sure if they are that useful in this scenario.

I have a use case where I'd like to be able to restart a specific worker in order to mitigate a third-party library's intermittent deadlocking, but calling retire_workers() doesn't cause the pod to restart. Is there an extra step I'm missing here?

jcary741 avatar Jul 07 '22 12:07 jcary741

@jcary741 This issue is discussing the HTTP routes specifically. Are you using /api/v1/retire_workers or are you calling retire_worker() in Python? If it's Python would you mind opening a new issue?

jacobtomlinson avatar Jul 14 '22 15:07 jacobtomlinson

@jacobtomlinson I agree that a new issue should be opened to handle the issues of zombie workers following retire_workers, but I think you (or maybe @philipp-sontag-by) should be the one to open it since you were first to identify the issue and how it relates to dask-kubernetes. If I were to try to open an issue, it would just be me quoting from this thread.

jcary741 avatar Jul 14 '22 15:07 jcary741

@jcary741 I think given that you have a use case where this happens it would be valuable for you to raise it. But feel free to quote from here too.

jacobtomlinson avatar Jul 19 '22 10:07 jacobtomlinson