`SpecCluster` fails to remove grouped workers when they die, breaking adaptive scaling
When using SpecCluster with grouped workers (e.g., JobQueueCluster from dask-jobqueue with processes > 1, c.f. dask/dask-jobqueue#498), dead workers are not properly removed from self.workers or self.worker_spec. This causes the adaptive system to incorrectly believe workers are still "requested" when they have actually died, preventing scale-up.
Example
Apologies it's not a full reproducer
# Using SLURMCluster (subclass of JobQueueCluster -> SpecCluster)
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(
cores=8,
processes=8, # Creates grouped workers
memory="32GB",
walltime="01:00:00"
)
cluster.adapt(minimum=0, maximum=10)
# When jobs die (timeout, killed, etc.):
# - cluster.observed drops to 0 (scheduler sees no workers)
# - cluster.requested stays high (dead jobs still in self.workers)
# - Adaptive won't scale up new workers
log from Adaptive showing the issue
target=32 len(self.plan)=32 len(self.observed)=15 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=14 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=12 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=11 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=10 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=9 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=8 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=7 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=6 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=5 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=4 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=3 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=2 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=1 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=0 len(self.requested)=32
Context
Setup: JobQueueCluster creates grouped workers when processes > 1:
# In JobQueueCluster.__init__
if "processes" in self._job_kwargs and self._job_kwargs["processes"] > 1:
worker["group"] = ["-" + str(i) for i in range(self._job_kwargs["processes"])]
This creates a worker_spec entry like:
python"cluster-0": {
"cls": SLURMJob,
"options": {...},
"group": ["-0", "-1", "-2", "-3", "-4", "-5", "-6", "-7"]
}
The Bug: When workers die, _update_worker_status receives expanded names ("cluster-0-0") but tries to look them up in self.workers which uses job names ("cluster-0"):
# Current broken implementation
def _update_worker_status(self, op, msg):
if op == "remove":
name = self.scheduler_info["workers"][msg]["name"] # "cluster-0-0"
def f():
if name in self.workers: # self.workers has "cluster-0", not "cluster-0-0"
# This never executes for grouped workers!
del self.workers[name]
Result:
- self.workers keeps dead job objects
- self.worker_spec keeps their specifications
- adaptive doesn't scale up
For a fix SpecCluster._update_worker_status will need to:
- Remove from both self.workers AND self.worker_spec when a remove signal is sent from the scheduler
- Handle both grouped and non-grouped workers
A choice needs to be made about how to interpret a removal signal for one worker in a group. Either:
- a group is removed when one member is removed
- a group is removed only when all members are removed
- a group is removed when some fraction of members are removed
fix outline
Below is an outline of the fix I came up with, this implements the aggressive cleanup version where a group of workers is removed when a remove signal is received for a single member of the group
GROUPED, NON-GROUPED and BOTH tag code that is relevant for either grouped workers, non-grouped workers or both
def _update_worker_status(self, op, msg):
if op == "remove":
worker_name = self.scheduler_info["workers"][msg]["name"]
def f():
# BOTH: Check if this worker is truly gone from scheduler
if any(d.get("name") == worker_name for d in self.scheduler_info.get("workers", {}).values()):
return
# BOTH: Iterate through all job specs to find which job owns this worker
for job_name, spec in list(self.worker_spec.items()):
worker_found = False
# GROUPED: Handle jobs with multiple worker processes
if "group" in spec:
# GROUPED: Build list of all worker names for this job
group_names = [str(job_name) + suffix for suffix in spec["group"]]
# GROUPED: Check if the removed worker belongs to this job
if worker_name in group_names:
worker_found = True
# GROUPED: Remove entire job when ANY worker dies
# Assumption: partial failure means total failure
if job_name in self.workers:
self._futures.add(asyncio.ensure_future(self.workers[job_name].close()))
del self.workers[job_name]
del self.worker_spec[job_name]
# NON-GROUPED: Handle single-process jobs
elif worker_name == job_name:
# NON-GROUPED: For single workers, the worker name matches the job name
worker_found = True
# NON-GROUPED: Remove immediately (only one worker anyway)
if job_name in self.workers:
self._futures.add(asyncio.ensure_future(self.workers[job_name].close()))
del self.workers[job_name]
del self.worker_spec[job_name]
# BOTH: Stop searching once we've found and handled the worker
if worker_found:
break
# BOTH: Wait before removing to handle temporary disconnections
delay = parse_timedelta(dask.config.get("distributed.deploy.lost-worker-timeout"))
asyncio.get_running_loop().call_later(delay, f)
# BOTH: Call parent implementation for any additional handling
super()._update_worker_status(op, msg)
Once we make a choice about how to interpret the removal signal for a grouped worker I'm happy to go ahead with a PR, just need to know what the maintainers here prefer.
We should probably fix #9103 while we're at it.