dask-kubernetes icon indicating copy to clipboard operation
dask-kubernetes copied to clipboard

Issue with AdaptiveScaling using Dask Kubernetes Operator

Open naveenrb98 opened this issue 1 year ago • 9 comments

I recently used Dask on kubernetes using the operator which I installed using helm. Noticed that workers are constantly killed during scaling up/down. I came across issues registered already and a PR being merged 5 days ago. https://github.com/dask/dask-kubernetes/pull/649 --> This fix has been merged already. Looking at this I removed current operator and installed operator with version 2023.3.0. But I am still facing the same issue when autoscaling. So i was wondering whether the fix is available via Helm?

naveenrb98 avatar Mar 13 '23 07:03 naveenrb98

The fix should be available in 2023.3.0 and @jmif has been testing it in production for a while with good results.

Could you share more of a reproducer with an example of what you are running and what behaviour you are seeing?

jacobtomlinson avatar Mar 13 '23 11:03 jacobtomlinson

So we are running dask on kubernetes with minimum 3 workers and maximum of 10 workers with cluster scaling allowed. What we notice is that scheduler is sending scale up request very frequently. Also scheduler is not counting the workers accurately. Example when there are already more than 7 workers scheduler requested a scale down from 5 workers to 4 workers. Because of this frequent scale up scale down requests happening via kopf the workers were very unstable. They get spawned and killed simultaneously without even giving time to process anything which resulted in poor perfrormance. To address this I went through the controller code which had a cool down of 15 seconds. It seemed not enough when you deal with cluster autoscaler as it has to bring up nodes and then schedule pods so i came up with a different flow to keep frequent scaling from happening.

adapt_state = {"scaleup": None, "scaledown": None}

@kopf.timer("daskautoscaler.kubernetes.dask.org", interval=5.0)
async def daskautoscaler_adapt(spec, name, namespace, logger, **kwargs):
    async with kubernetes.client.api_client.ApiClient() as api_client:
        coreapi = kubernetes.client.CoreV1Api(api_client)

        pod_ready = False
        try:
            scheduler_pod = await coreapi.read_namespaced_pod(
                f"{spec['cluster']}-scheduler", namespace
            )
            if scheduler_pod.status.phase == "Running":
                pod_ready = True
        except ApiException as e:
            if e.status != 404:
                raise e

        if not pod_ready:
            logger.info("Scheduler not ready, skipping autoscaling")
            return

        customobjectsapi = kubernetes.client.CustomObjectsApi(api_client)
        customobjectsapi.api_client.set_default_header(
            "content-type", "application/merge-patch+json"
        )

        autoscaler_resource = await customobjectsapi.get_namespaced_custom_object(
            group="kubernetes.dask.org",
            version="v1",
            plural="daskautoscalers",
            namespace=namespace,
            name=name,
        )

        worker_group_resource = await customobjectsapi.get_namespaced_custom_object(
            group="kubernetes.dask.org",
            version="v1",
            plural="daskworkergroups",
            namespace=namespace,
            name=f"{spec['cluster']}-default",
        )

        current_replicas = int(worker_group_resource["spec"]["worker"]["replicas"])
        cooldown_until = float(
            autoscaler_resource.get("metadata", {})
            .get("annotations", {})
            .get(DASK_AUTOSCALER_COOLDOWN_UNTIL_ANNOTATION, time.time())
        )

        # Cooldown autoscaling to prevent thrashing
        # if time.time() < cooldown_until:
        #     logger.info("Autoscaler for %s is in cooldown", spec["cluster"])
        #     return

        # Ask the scheduler for the desired number of worker
        try:
            desired_workers = await get_desired_workers(
                scheduler_service_name=f"{spec['cluster']}-scheduler",
                namespace=namespace,
                logger=logger,
            )
        except SchedulerCommError:
            logger.error("Unable to get desired number of workers from scheduler.")
            return

        # Ensure the desired number is within the min and max
        desired_workers = max(spec["minimum"], desired_workers)
        desired_workers = min(spec["maximum"], desired_workers)

        if current_replicas > 0:
            max_scale_down = int(current_replicas * 0.25)
            max_scale_down = 1 if max_scale_down == 0 else max_scale_down
            desired_workers = max(current_replicas - max_scale_down, desired_workers)

        # Update the default DaskWorkerGroup
        if desired_workers != current_replicas:
            if desired_workers > current_replicas:
                if adapt_state['scaleup'] is not None:
                    if (time.time() - adapt_state['scaleup']['last_request']) < 60:
                        logger.info(f"60 seconds not elapsed {adapt_state}")
                        return
                    if adapt_state['scaleup'] is not None:
                        if adapt_state['scaleup']['desired_size'] > desired_workers:
                            logger.info(f"60 seconds elapsed {adapt_state} but already scale up is in progress")
                            logger.info(f"{adapt_state}")
                            return
                        # if time.time() < cooldown_until:
                        #     if (cooldown_until - time.time()) > 180:
                        #         logger.info(f"Remaining cooldown {(cooldown_until - time.time())}")
                        #         logger.info("Autoscaler for %s is in cooldown, cannot scale up now", spec["cluster"])
                        #         return
                        adapt_state['scaleup'] = {'desired_size': desired_workers, 'last_request': time.time()}
                        await customobjectsapi.patch_namespaced_custom_object_scale(
                            group="kubernetes.dask.org",
                            version="v1",
                            plural="daskworkergroups",
                            namespace=namespace,
                            name=f"{spec['cluster']}-default",
                            body={"spec": {"replicas": desired_workers}},
                        )
                        cooldown_until = time.time() + 300
                        await customobjectsapi.patch_namespaced_custom_object(
                            group="kubernetes.dask.org",
                            version="v1",
                            plural="daskautoscalers",
                            namespace=namespace,
                            name=name,
                            body={
                                "metadata": {
                                    "annotations": {
                                        DASK_AUTOSCALER_COOLDOWN_UNTIL_ANNOTATION: str(
                                            cooldown_until
                                        )
                                    }
                                }
                            },
                        )
                else:
                    await customobjectsapi.patch_namespaced_custom_object_scale(
                        group="kubernetes.dask.org",
                        version="v1",
                        plural="daskworkergroups",
                        namespace=namespace,
                        name=f"{spec['cluster']}-default",
                        body={"spec": {"replicas": desired_workers}},
                    )
                    cooldown_until = time.time() + 300
                    await customobjectsapi.patch_namespaced_custom_object(
                        group="kubernetes.dask.org",
                        version="v1",
                        plural="daskautoscalers",
                        namespace=namespace,
                        name=name,
                        body={
                            "metadata": {
                                "annotations": {
                                    DASK_AUTOSCALER_COOLDOWN_UNTIL_ANNOTATION: str(
                                        cooldown_until
                                    )
                                }
                            }
                        },
                    )
                    if adapt_state['scaleup'] is None:
                        adapt_state['scaleup'] = {'desired_size': desired_workers, 'last_request': time.time()}
            else:
                # if time.time() < cooldown_until:
                #     logger.info(f"Remaining cooldown {(cooldown_until - time.time())}")
                #     logger.info("Autoscaler for %s is in cooldown, cannot scale down now", spec["cluster"])
                #     return
                # else:
                if adapt_state['scaleup'] is not None:
                    if (time.time() - adapt_state['scaleup']['last_request']) < 240:
                        adapt_state['scaledown'] = {'desired_size': desired_workers, 'last_request': time.time()}
                        logger.info(f"Last scale up request arrived within past 4 mins skipping scale down")
                        logger.info(f"{adapt_state}")
                        return
                if adapt_state['scaledown'] is None:
                    adapt_state['scaledown'] = {'desired_size': desired_workers, 'last_request': time.time()}
                    adapt_state['scaleup'] = {'desired_size': desired_workers, 'last_request': time.time()}
                await customobjectsapi.patch_namespaced_custom_object_scale(
                    group="kubernetes.dask.org",
                    version="v1",
                    plural="daskworkergroups",
                    namespace=namespace,
                    name=f"{spec['cluster']}-default",
                    body={"spec": {"replicas": adapt_state['scaledown']['desired_size']}},
                )

            logger.info(
                "Autoscaler updated %s worker count from %d to %d",
                spec["cluster"],
                current_replicas,
                desired_workers,
            )
        else:
            logger.debug(
                "Not autoscaling %s with %d workers", spec["cluster"], current_replicas
            )

It's in a very rough shape. Didn't have much time to refactor and prettify. Basically this is what I did to reduce frequent scaling. Now system is a bit more stable than how it was earlier. All of this was done keeping in mind that we use cluster autoscaler. Also our use case requires dask to run 24x7 with varying workload.

naveenrb98 avatar Mar 13 '23 19:03 naveenrb98

Thanks @naveenrb98. Would you mind putting that code into a PR so that we can see the diff more clearly and review the changes? Don't worry if it's rough, just mark it as a draft. It makes it easier to discuss in PR format though.

jacobtomlinson avatar Mar 14 '23 11:03 jacobtomlinson

I've opened a PR as requested. https://github.com/dask/dask-kubernetes/pull/675

naveenrb98 avatar Mar 14 '23 11:03 naveenrb98

Also the scaling is not happening properly. The number of scale downs that happened are so many but still there are 4 worker pods running. image

naveenrb98 avatar Mar 14 '23 12:03 naveenrb98

@jmif would you mind taking a look at this issue and the related PR. I remember you talking about extending the cooldowns and I'm guessing you ran into similar issues.

@naveenrb98 would you mind opening a separate issue for the updated dask-cluster worker count from 4 to 3 problem. That sounds like a separate thing that we could take a look at quickly.

jacobtomlinson avatar Mar 14 '23 13:03 jacobtomlinson

I have opened up a new issue for improper scaling -> https://github.com/dask/dask-kubernetes/issues/677

naveenrb98 avatar Mar 14 '23 17:03 naveenrb98

Any updates on this issue? We've been holding off on switching to the kubernetes operator based on these two issues but since there haven't been any similar posts I'm wondering whether it might be somewhat isolated/specific to this use case...?

bnaul avatar Apr 10 '23 15:04 bnaul

@bnaul I'm not seeing other reports of this issue. I wonder if this is mainly isolated to 24/7 clusters.

jacobtomlinson avatar Apr 11 '23 20:04 jacobtomlinson