dask-kubernetes icon indicating copy to clipboard operation
dask-kubernetes copied to clipboard

Support adaptive scaling in Helm cluster

Open Jeffwan opened this issue 5 years ago • 7 comments

As I understand, the different between KubeCluster and HelmCluster is

  1. KubeCluster start scheduler at where the client runs, the worker resources come from Kubernetes.
  2. HelmCluster has a long running scheduler pod in Kubernetes cluster.

My requirement is, I hope there's a long running scheduler in the cluster and multiple clients can connects this scheduler to submit tasks, the worker resources can come from same kubernetes cluster as scheduler and they can be scale up and down based on the load like what KubeCluster provides.

Seems it's a combination of KubeCluster and HelmCluster. Did community consider this case when we add Kubernetes support? Is there any technical blockers? If that's something reasonable, I can help work on this feature request

Jeffwan avatar Nov 01 '20 06:11 Jeffwan

Thanks for raising this @Jeffwan.

A couple of corrections to your understanding:

  1. KubeCluster creates a new Dask cluster when it is instantiated. The scheduler can either be local to the Client or remote inside the cluster with the deploy_mode kwarg. But the lifecycle of the cluster is tied to the script that instantiated it. If the script exits the cluster is cleaned up during garbage collection.
  2. HelmCluster is to allow connecting to an existing cluster which was created via the Dask Helm Chart. The goal being to allow folks to access logs and manually scale the cluster in a way that is consistent with other cluster managers. But HelmCluster does not create or destroy the cluster.

In Dask terminology we refer to KubeCluster as being an ephemeral cluster option, and HelmCluster being a fixed cluster option. You may find this blog post of interest.

Running a single cluster with multiple clients connected is generally discouraged as Dask has no concepts to ensure fair usage. Tasks are executed on a first-come-first-served bases and this makes it very possible for a single client to hog a cluster.

Generally for these use cases we recommend each client creates their own ephemeral cluster with KubeCluster and set it to adaptive. One thing we have discussed (but I don't think implemented) is ensuring each additional worker pod created with KubeCluster will have a decreasing priority. This would allow k8s to effectively balance the available resource.

You're not the first to ask about adding adaptive functionality to HelmCluster. However, we have intentionally disabled that functionality due to a technical blocker.

https://github.com/dask/dask-kubernetes/blob/ccb9864d463a3e14988af53a947a658cf32e7e10/dask_kubernetes/helm.py#L246-L256

The problem is that Dask workers are stateful, and the Dask scheduler manages that state. When an adaptive cluster decides to scale down state is intentionally removed from a worker before removing it, similar to draining a k8s node before removing it. However for the Helm Chart we use a Deployment resource for workers and can only set the desired number of replicas. So when removing a worker Dask would drain the state from one Pod, tell the worker process to exit, and then decrement the number of replicas in the deployment. Typically this causes a race condition where k8s can restart the worker before the new number of replicas updates and then remove a different pod when it does, resulting in lost state.

This is not a factor in KubeCluster because it creates Pod resources directly and manages their lifecycle fully.

I would be really interested to hear your thoughts on this. Every assumption here is definitely up for debate. It would also be great to hear more about your use case so we can take that into consideration.

jacobtomlinson avatar Nov 02 '20 10:11 jacobtomlinson

Hi @jacobtomlinson - I wanted to piggyback off of this exact question to perhaps add some clarity towards people who are looking for Dask as a small-business solution to schedule workflows. By the way, thanks for everything you have done - we need more people like you.

I am at a crossroads for my small business to deploy Dask as a way for our projected ~10 analysts to execute long-running Python computations. Here's the workflow that I run:

  • Someone submits their code through our Admin interface
  • That code is sent to our Django Webserver pod running inside of Kubernetes
  • Code is to be processed, depending on what the user specifies, by either threads or processes depending on if the GIL is released (such as a Dask-DF operation)
  • The Number of Workers is known beforehand (our analysts have to specify how many processes/threads they want)

My Attempts: I initially have three ways towards setting up our infrastructure:

  1. Launch the Dask-Helm chart and enable Horizontal Autoscaling by setting a metric to scale off of CPU as shown in articles like these: https://levelup.gitconnected.com/deploy-and-scale-your-dask-cluster-with-kubernetes-9e43f7e24b04
  • I quickly realized that, according to the API, https://kubernetes.dask.org/en/latest/api.html#dask_kubernetes.HelmCluster.scale, workers are not terminated gracefully, so this won't work.
  1. Launch the Dask-Helm chart and use my database to keep a count of how many workers I need and how many workers are active (so a Database Push before and After each Dask Process) and manually scale that way using client.cluster.scale(). Problem is, workers are again not terminated gracefully and a running task could be terminated instead.
  2. Using Dask-Kubernetes as you've outlined in this post and as I try and see if its right for us below.

The Actual Question: I was wondering if this was the right way to do it, starting from where I left off using KubeCluster:

  • Code is sent to my Django Webserver inside of a Kubernetes pod
  • Create a new KubeCluster using a worker-spec for that specific task, and in that case I can define whether I want larger workers for more threads or small workers for more processes, using something like this:
pod_spec = make_pod_spec(image='daskdev/dask:latest',
                          memory_limit='4G', memory_request='4G',
                          cpu_limit=1, cpu_request=1,
                          env={'EXTRA_PIP_PACKAGES': 'fastparquet git+https://github.com/dask/distributed'})
cluster = KubeCluster(pod_spec)
cluster.scale(10)
  • Scale the Kube Cluster to how much resources was defined by our analyst.
  • Let Google Kubernetes Engine handle scaling nodes to create space for the Kube Cluster
  • Close the Kube Cluster by calling cluster.close() and client.close() when that task is done.
  • Therefore, we don't handle scaling to Kubernetes, but keep it all within Dask.

Will spread the love if this is answered and I've understood that the last implementation as I outlined is the way to go! If I wrote something confusing, I'll be more than happy to correct myself.

omarsumadi avatar Jan 07 '21 18:01 omarsumadi

Thanks for the comment @omarsumadi (although it should probably be a new issue).

Thanks for the praise! And to answer your question, yes that workflow sounds totally reasonable. Although I would like to point you to Prefect which is a workflow management tool built on Dask. It sounds like you are trying to build the same thing.

cc @jcrist

jacobtomlinson avatar Jan 08 '21 12:01 jacobtomlinson

@jacobtomlinson Hey Jacob - ok great, that takes a lot of weight off my shoulders.

I'm new to Github, so I didn't want to make a new issue because it didn't fall under any of the categories of:

  • Bug Report, Feature Request, Security Vulnerability. General questions linked to Stack Overflow, so not sure what I would have classified this 'question' under any of the issue types.

What would you suggest doing next time if something like this came up? Also, you should let people sponsor you on Github!

Thanks!

Oh and about Perfect - I'll look into this. I use the word 'potential' analysts very strongly, as in, we are looking to get some people on board but still reaching out to funding. I'll reach out when the time comes hopefully, but nothing is in the bag right now!

omarsumadi avatar Jan 08 '21 14:01 omarsumadi

Sure I understand that. This is probably the kind of question that would be asked on a forum, which we have discussed creating in the past.

Also, you should let people sponsor you on Github!

That's very kind of you, but I'll leave my employer to do the sponsoring 😄.

If you want to give back to the Dask community you can donate via NumFocus.

jacobtomlinson avatar Jan 08 '21 14:01 jacobtomlinson

@jacobtomlinson I've been thinking about what adaptive scaling for HelmCluster could look like, curious what you think about a hybrid approach of the two current options (and also whether it overlaps or is even totally redundant with what you're thinking about in https://github.com/dask/dask-kubernetes/pull/318 https://github.com/dask/distributed/pull/4605 etc).

Basically I was imagining that the scheduler would be managed externally by Helm, but would start with either no workers or a very limited static worker deployment. Then the cluster object would connect to the existing scheduler like HelmCluster but create+autoscale pods like KubeCluster. For us the benefits of this compared to just using KubeCluster would be 1) we can still use our existing simple helm setup in cases where we don't care about autoscaling, and 2) we have some tasks that require worker resources and getting autoscaling working for that seems extra complicated, so I was thinking this would let us autoscale "simple" non-resource-constrained work and manually add workers with resources when need.

bnaul avatar Mar 27 '21 14:03 bnaul

Thanks for sharing your interest here @bnaul.

I think rather than trying to build some hybrid the best way to move forward would be a Dask operator for Kubernetes. We could have a DaskCluster resource which behaves much like a deployment but with intelligence around worker state.

Then we could shuffle both the Helm Chart and KubeCluster over to use this same resource. There is already some work in dask-gateway on an operator so I would imagine extending that. This would hugely reduce complexity in this project and allow much better code reuse than we have today.

jacobtomlinson avatar Mar 29 '21 08:03 jacobtomlinson

The HelmCluster was removed in https://github.com/dask/dask-kubernetes/pull/890. Closing.

jacobtomlinson avatar Apr 30 '24 15:04 jacobtomlinson