dask-kubernetes icon indicating copy to clipboard operation
dask-kubernetes copied to clipboard

Scheduller plugin to scale worker nodes

Open davzucky opened this issue 2 years ago • 7 comments

We are currently running dask on kubernetes. At the moment we are scaling the number of workers using metrics from the scheduler prometheus which return you the information about the desired number of workers and an HPA. However this is not the best way to scale as sometime the worker which is currently processing is been killed. As well because we are using HPA, this mean we have every time one worker up.

The idea of the plugin is the following

  • Initialize with
    • Worker namespace
    • Worker deployment
    • Average time
    • min workers
    • max workers
  • Scale the number of workers replicas from the deployment or replicaset with the plug in

This would be a mix mode where the deployment and scheduler and worker are done from your own setup but you have the ability to scale using this plugin.

Before starting the development, I would like to know what the community is thinking about that.

davzucky avatar Nov 27 '21 04:11 davzucky

I definitely recommend against using HPAs for Dask. Workers have state and HPAs are generally only good at scaling stateless things.

Typically we autoscale Dask clusters using Adaptive which has some default heuristics, but can also be implemented by users for custom heuristics. It sounds like you want to read those docs and implement a custom Adaptive class for yourself.

I would still advise against using Kubernetes primitives like Deployments because they have no concept of state. You may be able to do something with StatefulSets but it still feels like a minefield.

We are about to start some work in the new year on building an official operator which will remove a lot of this pain.

jacobtomlinson avatar Nov 29 '21 09:11 jacobtomlinson

@jacobtomlinson Thank you for your reply. I looked at the code and I think I have a clear idea about how to do that using Adaptive and plugin. I would like to get your feedback about it. The idea will be to create a scheduler plugin which extend SchedulerPlugin. On the call to the methode async def start(self, scheduler) as I get the scheduler I can create an instance of KubeCluster which will be initialize with the scheduler. Looking at the code of SpecCluster if you pass an instance of scheduler it will use that rather than instantiating it's own. When I have the KubeCluster I can just call the method adapt which will do all the work for me. And on the plugin I just have to ensure I stop and close the kubecluster. With that setup I should be able to have a long running Dask scheduler which dynamically scale worker without relying on the HPA of Kubernetes

davzucky avatar Jan 05 '22 00:01 davzucky

Sounds like a reasonable approach. We are about to start work on an operator which will solve this problem too.

jacobtomlinson avatar Jan 05 '22 10:01 jacobtomlinson

@jacobtomlinson, I will start working on that plugin. Happy to know that you are looking at implementing an Operator. Do you have any repo/issue that I can follow ? I know from experience a lot of group are using Dask differently which can make the usage of operator hard. In my company for example we are using OpenShift, but we don't have admin Role. This mean we cannot add Operator which often required new CRD. This is a pain, and this is why I'm looking at having as well the ability to create worker on a fly from a long running scheduler.

Would you now as well, how I could create Pod with different setup base on Dask tag ? At the moment we have 3 types of worker, however we only create on type of them with the max capacity

  1. IO Base, which fetch, usually we want a lot of them with low CPU and standard memory
  2. Heavy memory worker which required one dedicated CPU and a lot of memory
  3. GPU base This one are static for training

Even today with KubeCluster this is not something you can specify. Do you think it can be useful to have tag base pod setup in the future ?

davzucky avatar Jan 06 '22 00:01 davzucky

The issue to track is #256.

Yeah it's a challenge when users can install CRDs, but my hope is that cluster admins will be able to install it for them. That's what happens commonly.

If you mean having a mix of different workers in a single Dask cluster then yeah we don't support that today in many of the cluster managers. But it's something we are keen to improve in the future.

jacobtomlinson avatar Jan 07 '22 11:01 jacobtomlinson

Yeah it's a challenge when users can install CRDs, but my hope is that cluster admins will be able to install it for them. That's what happens commonly.

Yes, I know the admin and they would do it, however it could take up to 6 months to follow the process :-(

If you mean having a mix of different workers in a single Dask cluster then yeah we don't support that today in many of the cluster managers. But it's something we are keen to improve in the future.

Yes, this is what I mean. Would be nice if distributed can handle that on the adaptive

davzucky avatar Jan 09 '22 01:01 davzucky

Would be nice if distributed can handle that on the adaptive

It would. @madsbk do you have any thoughts on updating the adaptive code to be aware of task annotations and scaling different workers groups based on that. Could be fun!

jacobtomlinson avatar Jan 10 '22 10:01 jacobtomlinson

Nothing to do here. Closing.

jacobtomlinson avatar Apr 30 '24 15:04 jacobtomlinson