dask-kubernetes icon indicating copy to clipboard operation
dask-kubernetes copied to clipboard

Missing idleTimeout key in daskcluster_autoshutdown

Open timomaier opened this issue 1 year ago • 20 comments

Describe the issue:

My KubeClusters sometimes do not get shut down properly on kubernetes when they're done with their work. Kubernetes logs state that there's an exception in a kopf finalizer which is retried indefinitely, apparently due to the spec dict given to daskcluster_autoshutdown:

  Timer 'daskcluster_autoshutdown' failed with an exception. Will retry.
  Traceback (most recent call last):
    File "/usr/local/lib/python3.10/site-packages/kopf/_core/actions/execution.py", line 276, in execute_handler_once
      result = await invoke_handler(
    File "/usr/local/lib/python3.10/site-packages/kopf/_core/actions/execution.py", line 371, in invoke_handler
      result = await invocation.invoke(
    File "/usr/local/lib/python3.10/site-packages/kopf/_core/actions/invocation.py", line 116, in invoke
      result = await fn(**kwargs)  # type: ignore
    File "/usr/local/lib/python3.10/site-packages/dask_kubernetes/operator/controller/controller.py", line 852, in daskcluster_autoshutdown
      if spec["idleTimeout"]:
    File "/usr/local/lib/python3.10/site-packages/kopf/_cogs/structs/dicts.py", line 297, in __getitem__
      return resolve(self._src, self._path + (item,))
    File "/usr/local/lib/python3.10/site-packages/kopf/_cogs/structs/dicts.py", line 121, in resolve
      result = result[key]
  KeyError: 'idleTimeout'

When I remove these lines from the DaskCluster resource YAML in kubernetes, the problem is gone

    finalizers:
      - kopf.zalando.org/KopfFinalizerMarker

Is it correct that daskcluster_autoshutdown as below receives spec as a specification dict, e.g. from make_cluster_spec(..., idle_timeout=5)? I tried expicitly adding the idle_timeout, but the problem persists

@kopf.timer("daskcluster.kubernetes.dask.org", interval=5.0)
async def daskcluster_autoshutdown(spec, name, namespace, logger, **kwargs):
    if spec["idleTimeout"]:
        try:
            idle_since = await check_scheduler_idle(
                scheduler_service_name=f"{name}-scheduler",
                namespace=namespace,
                logger=logger,
            )
        except Exception:
            logger.warn("Unable to connect to scheduler, skipping autoshutdown check.")
            return
        if idle_since and time.time() > idle_since + spec["idleTimeout"]:
            cluster = await DaskCluster.get(name, namespace=namespace)
            await cluster.delete()

Not sure if this is a proper bug, or an issue with kopf, or anything is misconfigured on my end. Appreciate any help. I'd also be fine with just removing the timer/finalizer if that's possible.

Anything else we need to know?:

Environment:

  • Dask version: 2024.4.1
  • Dask operator version: 2024.4.0
  • Python version: 3.10.12
  • kopf python version: 1.37.1
  • Operating System: Linux
  • Install method (conda, pip, source): pip

timomaier avatar Apr 15 '24 13:04 timomaier

It looks like the idle timeout option isn't making it through to the resource in Kubernetes. Could you describe the cluster resource and ensure it is set correctly? Could you also ensure you have the latest version on the operator installed?

jacobtomlinson avatar Apr 15 '24 15:04 jacobtomlinson

This is the config yaml of the DaskCluster (I removed unnecessary parts), if this helps


apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
  annotations:
    kopf.zalando.org/last-handled-configuration: >
      {"spec":} # Same spec dict as below
  creationTimestamp: '2024-04-15T15:37:26Z'
  finalizers:
    - kopf.zalando.org/KopfFinalizerMarker
  generation: 4
  managedFields:
    - apiVersion: kubernetes.dask.org/v1
      fieldsType: FieldsV1
      fieldsV1:
        f:spec:
          .: {}
          f:scheduler:
            .: {}
            f:service:
              .: {}
              f:ports:
                .: {}
                k:{"port":8786,"protocol":"TCP"}:
                  .: {}
                  f:name: {}
                  f:port: {}
                  f:protocol: {}
                  f:targetPort: {}
                k:{"port":8787,"protocol":"TCP"}:
                  .: {}
                  f:name: {}
                  f:port: {}
                  f:protocol: {}
                  f:targetPort: {}
              f:selector:
                .: {}
                f:dask.org/cluster-name: {}
                f:dask.org/component: {}
              f:type: {}
            f:spec:
              .: {}
              f:containers: {}
              f:imagePullSecrets: {}
          f:worker:
            .: {}
            f:replicas: {}
            f:spec:
              .: {}
              f:containers: {}
              f:imagePullSecrets: {}
              f:volumes: {}
        f:status:
          f:phase: {}
      manager: kr8s
      operation: Update
      time: '2024-04-15T15:37:26Z'
    - apiVersion: kubernetes.dask.org/v1
      fieldsType: FieldsV1
      fieldsV1:
        f:metadata:
          f:annotations:
            .: {}
            f:kopf.zalando.org/last-handled-configuration: {}
          f:finalizers:
            .: {}
            v:"kopf.zalando.org/KopfFinalizerMarker": {}
        f:status: {}
      manager: kopf
      operation: Update
      time: '2024-04-15T15:37:27Z'
  name: dask-cluster
  namespace: dask-operator
  resourceVersion: '712042645'
  uid: 3c7db72d-8f94-4904-b0c6-3e496f9b1ff6
spec:
  scheduler:
    ...
  worker:
    ...
status:
  phase: Running

The operator is running image ghcr.io/dask/dask-kubernetes-operator:2024.4.0, and a pip list inside the pod shows

Package            Version
------------------ -----------
...
dask               2024.4.0
dask-kubernetes    0+unknown
distributed        2024.4.0

I'm using dask in conjunction with prefect, and the creation of the KubeCluster is handed over to the prefect DaskTaskRunner, however the idle_timeout should be properly set via kwargs:


    spec = make_cluster_spec(
        name=f"dask-cluster-{getuser()}-{now}",
        # ...
        n_workers=n_workers,
        resources=resources,
        idle_timeout=5,
    )
    runner = DaskTaskRunner(
        cluster_class="dask_kubernetes.operator.KubeCluster",
        cluster_kwargs={
            "idle_timeout": 5,
            "custom_cluster_spec": spec,
            "namespace": "dask-operator",
        },

Not sure if this is related somehow.

timomaier avatar Apr 15 '24 16:04 timomaier

This is strange, I don't see idleTimeout being set in your spec. But it should be being set in make_cluster_spec().

https://github.com/dask/dask-kubernetes/blob/b254dd27591847216bc76c57ebb9def48bf12c4d/dask_kubernetes/operator/kubecluster/kubecluster.py#L867

Can you confirm that idleTimeout is set in your spec variable?

jacobtomlinson avatar Apr 17 '24 08:04 jacobtomlinson

pip list inside the pod shows

What about pip list from the machine you are running this code from?

jacobtomlinson avatar Apr 17 '24 08:04 jacobtomlinson

What about pip list from the machine you are running this code from?

Package                    Version
-------------------------- -----------------
dask                       2024.4.1
dask-kubernetes            2024.4.0
distributed                2024.4.1

Can you confirm that idleTimeout is set in your spec variable?

I can confirm, make_cluster_spec returns a dict like this

{
  'apiVersion': 'kubernetes.dask.org/v1',
  'kind': 'DaskCluster',
  'metadata': {'name': 'dask-cluster-2024-04-17_100447'},
  'spec': {'idleTimeout': 5, 'worker': {...}, 'scheduler': {...}}
}

The same spec dict is also present in the _custom_cluster_spec attribute of the KubeCluster instance after it is created. The idleTimeout attribute is also set in KubeCluster.

So, as you pointed out, it is set correctly but not given to the resource properly. The worker and scheduler spec's look perfectly fine though, it's just the idleTimeout that gets lost.

timomaier avatar Apr 17 '24 09:04 timomaier

Thanks for confirming. That dict gets passed straight to the create call, so there's nowhere for that key to get dropped in between. The only thing I can think is perhaps your CRDs are out of date and don't contain that property and so Kubernetes is silently dropping it. Can you uninstall the operator and ensure the CRDs have been cleaned up, then install it again?

jacobtomlinson avatar Apr 18 '24 06:04 jacobtomlinson

I did uninstall the operator and made sure everything related to it is gone , followed this guide, installed again, but unfortunately the problem persists.

The Exception is a bit annoying because it spams the logs of the kubernetes cluster, but not critical. My core issue was that resources were not deleted properly, but as a workaround I solved that by making sure to manualy delete all depoyments related to dask using kubernetes python package.

I'm open to other suggestions, otherwise if other people do not see this problem feel free to close the issue. Thanks for the quick help so far!

timomaier avatar Apr 18 '24 11:04 timomaier

Yeah it's just strange that key is being dropped somewhere. I also feel like it may be specific to your setup because nobody else has reported it.

We could easily change spec["idleTimeout"] to spec.get("idleTimeout", 0) which would silence the log noise but not resolve the problem with things not timing out.

jacobtomlinson avatar Apr 19 '24 09:04 jacobtomlinson

Hello,

I think I am facing the same issue.

None of my attempts to have the cluster automatically shut down after idle_seconds seems to work:

  • setting idle_timeout in the constructor of KubeCluster
  • setting idle_timeout in the make_cluster_spec function
  • setting the distributed.scheduler.idle-timeout dask config value

I used to get the same error as the OP before updating to the latest version of dask + operator. Now, I see the following log message every 5 seconds, but the cluster never shuts down:

Timer 'daskcluster_autoshutdown' succeeded.

If I describe the DaskCluster resource on my k8s cluster, then the idleTimeout key is missing from the spec.

Also, I have verified using debug breakpoints that data["spec"]["idleTimeout"] is properly set on this line. However, in the resulting DaskCluster object, cluster.spec.get("idleTimeout") is None and cluster.raw["spec"].get("idleTimeout") is None.

So as @jacobtomlinson said, it seems like this parameter is dropped somewhere inbetween the constructor call, and the resource creation in the cluster. Uninstalling and re-installing the operator unfortunately did not fix the issue.


package versions: dask==2024.5.2 distributed==2024.5.2 dask-kubernetes==2024.5.0

dask-kubernetes-operator-2024.5.0 helm chart with app version 2022.4.1

tsanikgr avatar Jun 10 '24 11:06 tsanikgr

I'm having the same issue, idle_timeout doesn't work and I see the same error as in the OP over and over.

slevang avatar Feb 12 '25 21:02 slevang

Can you uninstall the operator and ensure the CRDs have been cleaned up, then install it again?

Can I just check that when folks are trying to resolve this that you are completely cleaning up the CRDs? Helm does not do this for you, you need to do it manually.

jacobtomlinson avatar Feb 13 '25 09:02 jacobtomlinson

Ok figured it out. I did the helm reinstall with a deletion of the CRDs, then was getting the same log as @tsanikgr but seeing idleTimeout: 0 in the spec despite having passed a different value.

I realized the issue was that I was using a custom_cluster_spec, but only passing idle_timeout to the KubeCluster constructor and not the creation of the spec. self.idle_timeout only gets used in this pathway and not the else branch. Maybe a slight improvement could be made there to avoid the confusion?

slevang avatar Feb 13 '25 14:02 slevang

@slevang ah thanks for digging into this. That makes sense. I wonder what the better approach would be:

  1. If custom_cluster_spec is set we check that none of the other kwargs are set. If they are we raise an exception saying "Either use kwargs or a custom_cluster_spec, not both".
  2. If both kwargs and custom_cluster_spec are set we attempt to merge the two

As a user which of these behaviours would you find least surprising?

jacobtomlinson avatar Feb 14 '25 10:02 jacobtomlinson

  1. is probably least ambiguous but could be a major breaking change if we just start raising in this situation. Surely others are mix and matching kwargs between these two objects. Warning that custom_cluster_spec takes precedence rather than raising would be a simple change, and agrees with the current docstring:

Path to a YAML manifest or a dictionary representation of a DaskCluster resource object which will be used to create the cluster instead of generating one from the other keyword arguments.

  1. would generally be a good UX but is maybe a little too magic.

slevang avatar Feb 15 '25 19:02 slevang

Surely others are mix and matching kwargs between these two objects.

Sure, but doing so leads to this bug. I agree that an exception may be distruptive to people who are doing this. But warnings are easy to ignore. I guess it boils down to how problematic we think this issue is. Does setting both things lead to unexpected results from your Dask cluster?

In this situation I think it could lead to unexpect costs if users are relying on idle_timeout and it's not being set. This seems serious enough that maybe it should be an exception.

jacobtomlinson avatar Feb 24 '25 17:02 jacobtomlinson

So just to write up next steps in case anyone wants to pick this up:

In the KubeCluster object the custom_cluster_spec and most other kwargs are mutually exclusive. If you set custom_cluster_spec it will ignore other kwargs.

https://github.com/dask/dask-kubernetes/blob/30cc719850afb6a5d4482e0df867d2368db000e0/dask_kubernetes/operator/kubecluster/kubecluster.py#L343-L357

We should add a check to the __init__ of KubeCluster that raises an exception if both custom_cluster_spec and any of the kwargs that get passed to make_cluster_spec() are set. The exception should instruct folks to set these options when they create the custom spec.

https://github.com/dask/dask-kubernetes/blob/30cc719850afb6a5d4482e0df867d2368db000e0/dask_kubernetes/operator/kubecluster/kubecluster.py#L161-L184

We also need a test that confirms the exception is raised in cases like custom_cluster_spec and idle_timeout are passed to KubeCluster.

jacobtomlinson avatar Feb 25 '25 10:02 jacobtomlinson

@jacobtomlinson Not sure if I'm doing something wrong but if I pass the following, my idleTimeout is always set to 0:

cluster_spec = make_cluster_spec(name=cluster_name, idle_timeout=30)
cluster_spec["spec"]["worker"] = worker_spec
cluster_spec["spec"]["scheduler"] = scheduler_spec
cluster = KubeCluster(
    namespace="dev-dask",
    name=cluster_name,
    custom_cluster_spec=cluster_spec,
    idle_timeout=30,
)

I tried passing the value in both KubeCluster or make_cluster_spec but still 0 is defined in the dask cluster resource's idleTimeout. I also did re-installed the CRDs. Any idea?

john-jam avatar Jun 13 '25 06:06 john-jam

@john-jam see the last comment I left

In the KubeCluster object the custom_cluster_spec and most other kwargs are mutually exclusive. If you set custom_cluster_spec it will ignore other kwargs.

This is the bug that is described by this issue. When you set something that gets ignored it should raise an Exception, but it currently doesn't.

jacobtomlinson avatar Jun 13 '25 10:06 jacobtomlinson

@jacobtomlinson As indicated in my previous message I also send the idle_timeout option through the make_cluster_spec but I have a weird behavior where the timeout is sometimes defined and sometimes not (without changing anything from the cluster's creation code). I also observed cases where the config is defined but then the operator removes it or apply 0.

It's happening randomly and is quite tricky to identify why. No errors are shown in the operator logs.

The code I use if that can help (with a 2024.5.3 operator running):

import os
from dask_kubernetes.operator import make_worker_spec, make_scheduler_spec, make_cluster_spec, KubeCluster

if __name__ == '__main__':
    # Create the Dask cluster
    cluster_name = "idle-test"
    worker_spec = make_worker_spec(
        image="my-img-with-dask",
        n_workers=0,
        worker_command="dask worker --nthreads 1 --death-timeout 30 --memory-limit 0",
    )
    scheduler_spec = make_scheduler_spec(
        cluster_name=cluster_name,
        image="my-img-with-dask",

    )
    cluster_spec = make_cluster_spec(name=cluster_name, idle_timeout=40) # <-- Set the IDLE timeout here
    cluster_spec["spec"]["worker"] = worker_spec
    cluster_spec["spec"]["scheduler"] = scheduler_spec
    cluster = KubeCluster(
        namespace="my-ns",
        name=cluster_name,
        custom_cluster_spec=cluster_spec,
    )

    # Scale to the desired number of workers
    cluster.scale(2)

    # Close the process unexpectedly to not trigger dask graceful shutdown
    os.system('kill %d' % os.getpid())

john-jam avatar Jun 18 '25 02:06 john-jam

Thanks for that reproducer, that's really helpful.

jacobtomlinson avatar Jun 18 '25 07:06 jacobtomlinson