distributed icon indicating copy to clipboard operation
distributed copied to clipboard

`SpecCluster` fails to remove grouped workers when they die, breaking adaptive scaling

Open alisterburt opened this issue 4 months ago • 2 comments

When using SpecCluster with grouped workers (e.g., JobQueueCluster from dask-jobqueue with processes > 1, c.f. dask/dask-jobqueue#498), dead workers are not properly removed from self.workers or self.worker_spec. This causes the adaptive system to incorrectly believe workers are still "requested" when they have actually died, preventing scale-up.

Example

Apologies it's not a full reproducer

# Using SLURMCluster (subclass of JobQueueCluster -> SpecCluster)
from dask_jobqueue import SLURMCluster

cluster = SLURMCluster(
    cores=8,
    processes=8,  # Creates grouped workers
    memory="32GB",
    walltime="01:00:00"
)
cluster.adapt(minimum=0, maximum=10)

# When jobs die (timeout, killed, etc.):
# - cluster.observed drops to 0 (scheduler sees no workers)
# - cluster.requested stays high (dead jobs still in self.workers)
# - Adaptive won't scale up new workers

log from Adaptive showing the issue

target=32 len(self.plan)=32 len(self.observed)=15 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=14 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=12 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=11 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=10 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=9 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=8 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=7 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=6 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=5 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=4 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=3 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=2 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=1 len(self.requested)=32
target=32 len(self.plan)=32 len(self.observed)=0 len(self.requested)=32

Context

Setup: JobQueueCluster creates grouped workers when processes > 1:

# In JobQueueCluster.__init__
if "processes" in self._job_kwargs and self._job_kwargs["processes"] > 1:
    worker["group"] = ["-" + str(i) for i in range(self._job_kwargs["processes"])]

This creates a worker_spec entry like:

python"cluster-0": {
    "cls": SLURMJob, 
    "options": {...},
    "group": ["-0", "-1", "-2", "-3", "-4", "-5", "-6", "-7"]
}

The Bug: When workers die, _update_worker_status receives expanded names ("cluster-0-0") but tries to look them up in self.workers which uses job names ("cluster-0"):

# Current broken implementation
def _update_worker_status(self, op, msg):
    if op == "remove":
        name = self.scheduler_info["workers"][msg]["name"]  # "cluster-0-0"
        def f():
            if name in self.workers:  # self.workers has "cluster-0", not "cluster-0-0"
                # This never executes for grouped workers!
                del self.workers[name]

Result:

  • self.workers keeps dead job objects
  • self.worker_spec keeps their specifications
  • adaptive doesn't scale up

alisterburt avatar Aug 26 '25 03:08 alisterburt

For a fix SpecCluster._update_worker_status will need to:

  1. Remove from both self.workers AND self.worker_spec when a remove signal is sent from the scheduler
  2. Handle both grouped and non-grouped workers

A choice needs to be made about how to interpret a removal signal for one worker in a group. Either:

  • a group is removed when one member is removed
  • a group is removed only when all members are removed
  • a group is removed when some fraction of members are removed

fix outline

Below is an outline of the fix I came up with, this implements the aggressive cleanup version where a group of workers is removed when a remove signal is received for a single member of the group

GROUPED, NON-GROUPED and BOTH tag code that is relevant for either grouped workers, non-grouped workers or both

def _update_worker_status(self, op, msg):
    if op == "remove":
        worker_name = self.scheduler_info["workers"][msg]["name"]
        
        def f():
            # BOTH: Check if this worker is truly gone from scheduler
            if any(d.get("name") == worker_name for d in self.scheduler_info.get("workers", {}).values()):
                return
            
            # BOTH: Iterate through all job specs to find which job owns this worker
            for job_name, spec in list(self.worker_spec.items()):
                worker_found = False
                
                # GROUPED: Handle jobs with multiple worker processes
                if "group" in spec:
                    # GROUPED: Build list of all worker names for this job
                    group_names = [str(job_name) + suffix for suffix in spec["group"]]
                    
                    # GROUPED: Check if the removed worker belongs to this job
                    if worker_name in group_names:
                        worker_found = True
                        
                        # GROUPED: Remove entire job when ANY worker dies
                        # Assumption: partial failure means total failure
                        if job_name in self.workers:
                            self._futures.add(asyncio.ensure_future(self.workers[job_name].close()))
                            del self.workers[job_name]
                        del self.worker_spec[job_name]
                
                # NON-GROUPED: Handle single-process jobs
                elif worker_name == job_name:
                    # NON-GROUPED: For single workers, the worker name matches the job name
                    worker_found = True
                    
                    # NON-GROUPED: Remove immediately (only one worker anyway)
                    if job_name in self.workers:
                        self._futures.add(asyncio.ensure_future(self.workers[job_name].close()))
                        del self.workers[job_name]
                    del self.worker_spec[job_name]
                
                # BOTH: Stop searching once we've found and handled the worker
                if worker_found:
                    break
        
        # BOTH: Wait before removing to handle temporary disconnections
        delay = parse_timedelta(dask.config.get("distributed.deploy.lost-worker-timeout"))
        asyncio.get_running_loop().call_later(delay, f)
    
    # BOTH: Call parent implementation for any additional handling
    super()._update_worker_status(op, msg)

alisterburt avatar Aug 26 '25 03:08 alisterburt

Once we make a choice about how to interpret the removal signal for a grouped worker I'm happy to go ahead with a PR, just need to know what the maintainers here prefer.

We should probably fix #9103 while we're at it.

alisterburt avatar Aug 26 '25 03:08 alisterburt