Missing idleTimeout key in daskcluster_autoshutdown
Describe the issue:
My KubeClusters sometimes do not get shut down properly on kubernetes when they're done with their work. Kubernetes logs state that there's an exception in a kopf finalizer which is retried indefinitely, apparently due to the spec dict given to daskcluster_autoshutdown:
Timer 'daskcluster_autoshutdown' failed with an exception. Will retry.
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/kopf/_core/actions/execution.py", line 276, in execute_handler_once
result = await invoke_handler(
File "/usr/local/lib/python3.10/site-packages/kopf/_core/actions/execution.py", line 371, in invoke_handler
result = await invocation.invoke(
File "/usr/local/lib/python3.10/site-packages/kopf/_core/actions/invocation.py", line 116, in invoke
result = await fn(**kwargs) # type: ignore
File "/usr/local/lib/python3.10/site-packages/dask_kubernetes/operator/controller/controller.py", line 852, in daskcluster_autoshutdown
if spec["idleTimeout"]:
File "/usr/local/lib/python3.10/site-packages/kopf/_cogs/structs/dicts.py", line 297, in __getitem__
return resolve(self._src, self._path + (item,))
File "/usr/local/lib/python3.10/site-packages/kopf/_cogs/structs/dicts.py", line 121, in resolve
result = result[key]
KeyError: 'idleTimeout'
When I remove these lines from the DaskCluster resource YAML in kubernetes, the problem is gone
finalizers:
- kopf.zalando.org/KopfFinalizerMarker
Is it correct that daskcluster_autoshutdown as below receives spec as a specification dict, e.g. from make_cluster_spec(..., idle_timeout=5)? I tried expicitly adding the idle_timeout, but the problem persists
@kopf.timer("daskcluster.kubernetes.dask.org", interval=5.0)
async def daskcluster_autoshutdown(spec, name, namespace, logger, **kwargs):
if spec["idleTimeout"]:
try:
idle_since = await check_scheduler_idle(
scheduler_service_name=f"{name}-scheduler",
namespace=namespace,
logger=logger,
)
except Exception:
logger.warn("Unable to connect to scheduler, skipping autoshutdown check.")
return
if idle_since and time.time() > idle_since + spec["idleTimeout"]:
cluster = await DaskCluster.get(name, namespace=namespace)
await cluster.delete()
Not sure if this is a proper bug, or an issue with kopf, or anything is misconfigured on my end. Appreciate any help. I'd also be fine with just removing the timer/finalizer if that's possible.
Anything else we need to know?:
Environment:
- Dask version: 2024.4.1
- Dask operator version: 2024.4.0
- Python version: 3.10.12
- kopf python version: 1.37.1
- Operating System: Linux
- Install method (conda, pip, source): pip
It looks like the idle timeout option isn't making it through to the resource in Kubernetes. Could you describe the cluster resource and ensure it is set correctly? Could you also ensure you have the latest version on the operator installed?
This is the config yaml of the DaskCluster (I removed unnecessary parts), if this helps
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
annotations:
kopf.zalando.org/last-handled-configuration: >
{"spec":} # Same spec dict as below
creationTimestamp: '2024-04-15T15:37:26Z'
finalizers:
- kopf.zalando.org/KopfFinalizerMarker
generation: 4
managedFields:
- apiVersion: kubernetes.dask.org/v1
fieldsType: FieldsV1
fieldsV1:
f:spec:
.: {}
f:scheduler:
.: {}
f:service:
.: {}
f:ports:
.: {}
k:{"port":8786,"protocol":"TCP"}:
.: {}
f:name: {}
f:port: {}
f:protocol: {}
f:targetPort: {}
k:{"port":8787,"protocol":"TCP"}:
.: {}
f:name: {}
f:port: {}
f:protocol: {}
f:targetPort: {}
f:selector:
.: {}
f:dask.org/cluster-name: {}
f:dask.org/component: {}
f:type: {}
f:spec:
.: {}
f:containers: {}
f:imagePullSecrets: {}
f:worker:
.: {}
f:replicas: {}
f:spec:
.: {}
f:containers: {}
f:imagePullSecrets: {}
f:volumes: {}
f:status:
f:phase: {}
manager: kr8s
operation: Update
time: '2024-04-15T15:37:26Z'
- apiVersion: kubernetes.dask.org/v1
fieldsType: FieldsV1
fieldsV1:
f:metadata:
f:annotations:
.: {}
f:kopf.zalando.org/last-handled-configuration: {}
f:finalizers:
.: {}
v:"kopf.zalando.org/KopfFinalizerMarker": {}
f:status: {}
manager: kopf
operation: Update
time: '2024-04-15T15:37:27Z'
name: dask-cluster
namespace: dask-operator
resourceVersion: '712042645'
uid: 3c7db72d-8f94-4904-b0c6-3e496f9b1ff6
spec:
scheduler:
...
worker:
...
status:
phase: Running
The operator is running image ghcr.io/dask/dask-kubernetes-operator:2024.4.0, and a pip list inside the pod shows
Package Version
------------------ -----------
...
dask 2024.4.0
dask-kubernetes 0+unknown
distributed 2024.4.0
I'm using dask in conjunction with prefect, and the creation of the KubeCluster is handed over to the prefect DaskTaskRunner, however the idle_timeout should be properly set via kwargs:
spec = make_cluster_spec(
name=f"dask-cluster-{getuser()}-{now}",
# ...
n_workers=n_workers,
resources=resources,
idle_timeout=5,
)
runner = DaskTaskRunner(
cluster_class="dask_kubernetes.operator.KubeCluster",
cluster_kwargs={
"idle_timeout": 5,
"custom_cluster_spec": spec,
"namespace": "dask-operator",
},
Not sure if this is related somehow.
This is strange, I don't see idleTimeout being set in your spec. But it should be being set in make_cluster_spec().
https://github.com/dask/dask-kubernetes/blob/b254dd27591847216bc76c57ebb9def48bf12c4d/dask_kubernetes/operator/kubecluster/kubecluster.py#L867
Can you confirm that idleTimeout is set in your spec variable?
pip listinside the pod shows
What about pip list from the machine you are running this code from?
What about pip list from the machine you are running this code from?
Package Version
-------------------------- -----------------
dask 2024.4.1
dask-kubernetes 2024.4.0
distributed 2024.4.1
Can you confirm that idleTimeout is set in your spec variable?
I can confirm, make_cluster_spec returns a dict like this
{
'apiVersion': 'kubernetes.dask.org/v1',
'kind': 'DaskCluster',
'metadata': {'name': 'dask-cluster-2024-04-17_100447'},
'spec': {'idleTimeout': 5, 'worker': {...}, 'scheduler': {...}}
}
The same spec dict is also present in the _custom_cluster_spec attribute of the KubeCluster instance after it is created. The idleTimeout attribute is also set in KubeCluster.
So, as you pointed out, it is set correctly but not given to the resource properly. The worker and scheduler spec's look perfectly fine though, it's just the idleTimeout that gets lost.
Thanks for confirming. That dict gets passed straight to the create call, so there's nowhere for that key to get dropped in between. The only thing I can think is perhaps your CRDs are out of date and don't contain that property and so Kubernetes is silently dropping it. Can you uninstall the operator and ensure the CRDs have been cleaned up, then install it again?
I did uninstall the operator and made sure everything related to it is gone , followed this guide, installed again, but unfortunately the problem persists.
The Exception is a bit annoying because it spams the logs of the kubernetes cluster, but not critical. My core issue was that resources were not deleted properly, but as a workaround I solved that by making sure to manualy delete all depoyments related to dask using kubernetes python package.
I'm open to other suggestions, otherwise if other people do not see this problem feel free to close the issue. Thanks for the quick help so far!
Yeah it's just strange that key is being dropped somewhere. I also feel like it may be specific to your setup because nobody else has reported it.
We could easily change spec["idleTimeout"] to spec.get("idleTimeout", 0) which would silence the log noise but not resolve the problem with things not timing out.
Hello,
I think I am facing the same issue.
None of my attempts to have the cluster automatically shut down after idle_seconds seems to work:
- setting
idle_timeoutin the constructor ofKubeCluster - setting
idle_timeoutin themake_cluster_specfunction - setting the
distributed.scheduler.idle-timeoutdask config value
I used to get the same error as the OP before updating to the latest version of dask + operator. Now, I see the following log message every 5 seconds, but the cluster never shuts down:
Timer 'daskcluster_autoshutdown' succeeded.
If I describe the DaskCluster resource on my k8s cluster, then the idleTimeout key is missing from the spec.
Also, I have verified using debug breakpoints that data["spec"]["idleTimeout"] is properly set on this line. However, in the resulting DaskCluster object, cluster.spec.get("idleTimeout") is None and cluster.raw["spec"].get("idleTimeout") is None.
So as @jacobtomlinson said, it seems like this parameter is dropped somewhere inbetween the constructor call, and the resource creation in the cluster. Uninstalling and re-installing the operator unfortunately did not fix the issue.
package versions: dask==2024.5.2 distributed==2024.5.2 dask-kubernetes==2024.5.0
dask-kubernetes-operator-2024.5.0 helm chart with app version 2022.4.1
I'm having the same issue, idle_timeout doesn't work and I see the same error as in the OP over and over.
Can you uninstall the operator and ensure the CRDs have been cleaned up, then install it again?
Can I just check that when folks are trying to resolve this that you are completely cleaning up the CRDs? Helm does not do this for you, you need to do it manually.
Ok figured it out. I did the helm reinstall with a deletion of the CRDs, then was getting the same log as @tsanikgr but seeing idleTimeout: 0 in the spec despite having passed a different value.
I realized the issue was that I was using a custom_cluster_spec, but only passing idle_timeout to the KubeCluster constructor and not the creation of the spec. self.idle_timeout only gets used in this pathway and not the else branch. Maybe a slight improvement could be made there to avoid the confusion?
@slevang ah thanks for digging into this. That makes sense. I wonder what the better approach would be:
- If
custom_cluster_specis set we check that none of the other kwargs are set. If they are we raise an exception saying "Either use kwargs or acustom_cluster_spec, not both". - If both kwargs and
custom_cluster_specare set we attempt to merge the two
As a user which of these behaviours would you find least surprising?
- is probably least ambiguous but could be a major breaking change if we just start raising in this situation. Surely others are mix and matching kwargs between these two objects. Warning that
custom_cluster_spectakes precedence rather than raising would be a simple change, and agrees with the current docstring:
Path to a YAML manifest or a dictionary representation of a DaskCluster resource object which will be used to create the cluster instead of generating one from the other keyword arguments.
- would generally be a good UX but is maybe a little too magic.
Surely others are mix and matching kwargs between these two objects.
Sure, but doing so leads to this bug. I agree that an exception may be distruptive to people who are doing this. But warnings are easy to ignore. I guess it boils down to how problematic we think this issue is. Does setting both things lead to unexpected results from your Dask cluster?
In this situation I think it could lead to unexpect costs if users are relying on idle_timeout and it's not being set. This seems serious enough that maybe it should be an exception.
So just to write up next steps in case anyone wants to pick this up:
In the KubeCluster object the custom_cluster_spec and most other kwargs are mutually exclusive. If you set custom_cluster_spec it will ignore other kwargs.
https://github.com/dask/dask-kubernetes/blob/30cc719850afb6a5d4482e0df867d2368db000e0/dask_kubernetes/operator/kubecluster/kubecluster.py#L343-L357
We should add a check to the __init__ of KubeCluster that raises an exception if both custom_cluster_spec and any of the kwargs that get passed to make_cluster_spec() are set. The exception should instruct folks to set these options when they create the custom spec.
https://github.com/dask/dask-kubernetes/blob/30cc719850afb6a5d4482e0df867d2368db000e0/dask_kubernetes/operator/kubecluster/kubecluster.py#L161-L184
We also need a test that confirms the exception is raised in cases like custom_cluster_spec and idle_timeout are passed to KubeCluster.
@jacobtomlinson Not sure if I'm doing something wrong but if I pass the following, my idleTimeout is always set to 0:
cluster_spec = make_cluster_spec(name=cluster_name, idle_timeout=30)
cluster_spec["spec"]["worker"] = worker_spec
cluster_spec["spec"]["scheduler"] = scheduler_spec
cluster = KubeCluster(
namespace="dev-dask",
name=cluster_name,
custom_cluster_spec=cluster_spec,
idle_timeout=30,
)
I tried passing the value in both KubeCluster or make_cluster_spec but still 0 is defined in the dask cluster resource's idleTimeout. I also did re-installed the CRDs. Any idea?
@john-jam see the last comment I left
In the
KubeClusterobject thecustom_cluster_specand most otherkwargsare mutually exclusive. If you setcustom_cluster_specit will ignore other kwargs.
This is the bug that is described by this issue. When you set something that gets ignored it should raise an Exception, but it currently doesn't.
@jacobtomlinson As indicated in my previous message I also send the idle_timeout option through the make_cluster_spec but I have a weird behavior where the timeout is sometimes defined and sometimes not (without changing anything from the cluster's creation code). I also observed cases where the config is defined but then the operator removes it or apply 0.
It's happening randomly and is quite tricky to identify why. No errors are shown in the operator logs.
The code I use if that can help (with a 2024.5.3 operator running):
import os
from dask_kubernetes.operator import make_worker_spec, make_scheduler_spec, make_cluster_spec, KubeCluster
if __name__ == '__main__':
# Create the Dask cluster
cluster_name = "idle-test"
worker_spec = make_worker_spec(
image="my-img-with-dask",
n_workers=0,
worker_command="dask worker --nthreads 1 --death-timeout 30 --memory-limit 0",
)
scheduler_spec = make_scheduler_spec(
cluster_name=cluster_name,
image="my-img-with-dask",
)
cluster_spec = make_cluster_spec(name=cluster_name, idle_timeout=40) # <-- Set the IDLE timeout here
cluster_spec["spec"]["worker"] = worker_spec
cluster_spec["spec"]["scheduler"] = scheduler_spec
cluster = KubeCluster(
namespace="my-ns",
name=cluster_name,
custom_cluster_spec=cluster_spec,
)
# Scale to the desired number of workers
cluster.scale(2)
# Close the process unexpectedly to not trigger dask graceful shutdown
os.system('kill %d' % os.getpid())
Thanks for that reproducer, that's really helpful.