distributed icon indicating copy to clipboard operation
distributed copied to clipboard

Resource scheduling bug: Scheduler allows over-allocation by ignoring worker's current resource usage

Open g199209 opened this issue 4 months ago • 2 comments

Describe the issue:

Dask scheduler incorrectly allows resource over-allocation when scheduling tasks with custom resource requirements. The scheduler's valid_workers() method only checks if a worker's total declared resources meet the task requirements, but ignores the worker's currently used resources. This leads to severe resource over-allocation (up to 450% in our tests) and inconsistent resource accounting between scheduler and worker.

The worker layer correctly enforces resource limits, but this creates a discrepancy where the scheduler reports tasks as "processing" while they're actually queued due to resource constraints.

Minimal Complete Verifiable Example:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
import dask
from distributed import LocalCluster, Client

def simple_task(task_id, duration=15):
    """Simple task that requires MEM resources"""
    print(f"Task {task_id} starting...")
    time.sleep(duration)
    print(f"Task {task_id} completed!")
    return f"Result from task {task_id}"

def test_resource_bug():
    print("=" * 60)
    print("DASK RESOURCE SCHEDULING BUG REPRODUCTION")
    print("=" * 60)
    print(f"Dask version: {dask.__version__}")
    
    # Create a local cluster with one worker having limited MEM resources
    cluster = LocalCluster(
        n_workers=1,
        threads_per_worker=20,  # High thread count to avoid thread limit interference
        memory_limit=None,  # Disable memory limit to focus on custom resources
        resources={"MEM": 10},  # Worker declares 10 MEM resources
        dashboard_address=None,
        silence_logs=False
    )
    
    client = Client(cluster)
    
    try:
        print(f"\nCluster Info:")
        print(f"Workers: {len(client.scheduler_info()['workers'])}")
        
        # Helper function to get resource info from scheduler
        def get_worker_resources(dask_scheduler):
            scheduler = dask_scheduler
            worker_info = {}
            for addr, worker in scheduler.workers.items():
                worker_info[addr] = {
                    'declared_resources': dict(worker.resources),
                    'used_resources': dict(worker.used_resources),
                    'processing_count': len(worker.processing)
                }
            return worker_info
        
        # Check initial resources
        print(f"\nInitial Worker Resources:")
        initial_resources = client.run_on_scheduler(get_worker_resources)
        for addr, info in initial_resources.items():
            print(f"  Worker: declared_MEM={info['declared_resources'].get('MEM', 0)}, "
                  f"used_MEM={info['used_resources'].get('MEM', 0)}, "
                  f"tasks={info['processing_count']}")
        
        print("\nSubmitting 15 tasks, each requiring 3 MEM resources...")
        print("   Worker declares: 10 MEM")
        print("   Total requirement: 15 * 3 = 45 MEM") 
        print("   Expected (strict): Only 3 tasks should run (3*3=9 <= 10 MEM)")
        print("   Let's see how many actually run simultaneously...")
        print("   Worker has 20 threads, so thread count is NOT a limiting factor")
        
        # Submit 15 tasks, each requiring 3 MEM resources
        # Total requirement: 15 * 3 = 45 MEM
        # Available: 10 MEM
        # Let's see how many actually run
        futures = []
        for i in range(1, 16):
            future = client.submit(
                simple_task, 
                task_id=i, 
                duration=15,  # Long enough to observe resource allocation
                resources={'MEM': 3},
                pure=False
            )
            futures.append(future)
            print(f"  Submitted task {i} (requires 3 MEM)")
        
        # Wait for tasks to start executing
        print("\nWaiting for tasks to start...")
        
        # Wait until at least some tasks are running
        max_wait = 10
        wait_time = 0
        while wait_time < max_wait:
            current_check = client.run_on_scheduler(get_worker_resources)
            running_tasks = sum(info['processing_count'] for info in current_check.values())
            if running_tasks > 0:
                print(f"Tasks started! {running_tasks} tasks now running.")
                break
            time.sleep(0.5)
            wait_time += 0.5
        
        if wait_time >= max_wait:
            print("Warning: Tasks may not have started yet")
        
        # Check resource usage after submission
        print(f"\nResource Usage After Task Submission:")
        current_resources = client.run_on_scheduler(get_worker_resources)
        for addr, info in current_resources.items():
            declared = info['declared_resources'].get('MEM', 0)
            used = info['used_resources'].get('MEM', 0)
            tasks = info['processing_count']
            overallocation = (used / declared * 100) if declared > 0 else 0
            
            print(f"  Worker: declared_MEM={declared}, used_MEM={used}, processing_tasks={tasks}")
            print(f"  Resource utilization: {overallocation:.1f}%")
            
            # Now with 20 threads, we should see all 15 tasks actually running if there's no resource limit
            print(f"  NOTE: Worker has 20 threads, so thread limit should NOT be a constraint")
            
            if used > declared:
                print(f"  *** RESOURCE ACCOUNTING BUG DETECTED!")
                print(f"      Scheduler reserved {used} MEM for {tasks} tasks")
                print(f"      but worker only declared {declared} MEM resources!")
                print(f"      This shows scheduler doesn't check available resources!")
                print(f"      Overallocation: {used - declared} MEM over limit")
            elif tasks < 15:
                print(f"  PARTIAL SCHEDULING: {tasks} tasks running, {15-tasks} waiting")
            else:
                print(f"  FULL OVERCOMMIT: All {tasks} tasks scheduled!")
                
        # Count how many tasks are actually running by checking logs
        # We'll compare this with scheduler's processing count
        
        # Check task statuses
        print(f"\nTask Statuses:")
        for i, future in enumerate(futures, 1):
            print(f"  Task {i}: {future.status}")
        
        # Cancel tasks to complete demonstration
        print(f"\nCancelling tasks to complete demonstration...")
        for future in futures:
            future.cancel()
        
        time.sleep(2)  # Wait for cancellation to complete
        
        # Final resource check
        print(f"\nFinal Resource Usage (after cancellation):")
        final_resources = client.run_on_scheduler(get_worker_resources)
        for addr, info in final_resources.items():
            print(f"  Worker: declared_MEM={info['declared_resources'].get('MEM', 0)}, "
                  f"used_MEM={info['used_resources'].get('MEM', 0)}, "
                  f"tasks={info['processing_count']}")
                  
    finally:
        print(f"\nClosing cluster...")
        client.close()
        cluster.close()

    print(f"\n" + "=" * 60)
    print("BUG REPORT SUMMARY:")
    print("- Issue: Dask scheduler doesn't check current resource usage")
    print("- Location: distributed/scheduler.py, valid_workers() method")
    print("- Line ~3279-3281: if supplied >= required:")
    print("- Fix needed: if (supplied - used_resources) >= required:")
    print("- Version affected: dask==2025.5.1 (possibly others)")
    print("=" * 60)

if __name__ == "__main__":
    test_resource_bug()

Expected Output (showing the bug):

Resource Usage After Task Submission:
  Worker: declared_MEM=10, used_MEM=45, processing_tasks=15
  Resource utilization: 450.0%
  *** RESOURCE ACCOUNTING BUG DETECTED!
      Scheduler reserved 45 MEM for 15 tasks
      but worker only declared 10 MEM resources!
      Overallocation: 35 MEM over limit

# But in the logs, only 3 tasks actually start:
Task 1 starting...
Task 2 starting...
Task 3 starting...

Root Cause Analysis:

This is a dual-layer architecture problem with inconsistent resource management between scheduler and worker:

1. Scheduler Layer Bug (distributed/scheduler.py:3279-3281)

The valid_workers() method in the scheduler has a critical flaw:

# Current buggy code
for addr, supplied in dr.items():
    if supplied >= required:  # 🚨 Only checks total declared, not available
        sw.add(addr)

Should be fixed to:

for addr, supplied in dr.items():
    ws = self.workers[addr] 
    available = supplied - ws.used_resources.get(resource, 0)
    if available >= required:
        sw.add(addr)

2. Worker Layer (Working Correctly)

The worker correctly enforces resource limits via distributed/worker_state_machine.py:

def _resource_restrictions_satisfied(self, ts: TaskState) -> bool:
    if not ts.resource_restrictions:
        return True
    return all(
        self.available_resources[resource] >= needed
        for resource, needed in ts.resource_restrictions.items()
    )

3. The Discrepancy

  • Scheduler says: 15 tasks processing, 45 MEM used (450% over-allocation)
  • Worker reality: Only 3 tasks executing, 9 MEM actually used
  • Result: Resource accounting is completely unreliable

Anything else we need to know?:

This bug has significant implications:

  1. Resource accounting is unreliable: used_resources shows false data
  2. Monitoring tools report incorrect resource utilization
  3. Scheduler makes poor decisions based on wrong resource state
  4. Can lead to resource starvation or inefficient scheduling

The worker's protection mechanism prevents catastrophic over-allocation, but the scheduler's incorrect accounting creates system inconsistency.

Environment:

  • Dask version: 2025.5.1
  • Python version: 3.12
  • Operating System: Rocky Linux 8.8
  • Install method (conda, pip, source): pip

g199209 avatar Sep 09 '25 08:09 g199209

I've implemented a working monkey patch for this resource scheduling bug that works in my specific use case, but I'd like to share it here for discussion and get feedback on whether there are better approaches.

Issue Analysis

After investigating the bug in distributed/scheduler.py:3279-3281, I found that fixing only the valid_workers() method is insufficient to resolve the complete resource scheduling issue.

1. The Original Bug

The valid_workers() method only checked total declared resources:

# Buggy code in distributed/scheduler.py:3279-3281
for addr, supplied in dr.items():
    if supplied >= required:  # Only checks total, ignores used resources
        sw.add(addr)

2. The Incomplete Fix

Simply fixing the resource check prevents over-allocation but creates a new problem:

# Fixed resource check
available = supplied - used
if available >= required:
    sw.add(addr)

This fix correctly prevents resource over-allocation, but tasks that are rejected get stuck in no-worker state indefinitely.

3. The Missing Piece: Task Rescheduling

Once tasks are in no-worker state, Dask does not automatically reschedule them when resources become available. This leads to:

  • First batch of tasks executes correctly
  • Remaining tasks get stuck in no-worker state forever
  • Resources are released but never utilized again

My Monkey Patch Solution (Working but may not be perfect)

Part 1: Fix Resource Checking (valid_workers method)

def _fixed_valid_workers(self, ts):
    # ... (worker_restrictions and host_restrictions unchanged)
    
    if ts.resource_restrictions:
        for resource, required in ts.resource_restrictions.items():
            # ... (setup code unchanged)
            
            for addr, supplied in dr.items():
                # 🚨 FIX: Check available resources, not just total declared
                ws = self.workers.get(addr)
                if ws is not None:
                    used = ws.used_resources.get(resource, 0)
                    available = supplied - used  # KEY FIX
                    if available >= required:
                        sw.add(addr)
                else:
                    # Fallback to original behavior if worker not found
                    if supplied >= required:
                        sw.add(addr)
    
    # ... (rest unchanged)

Part 2: Add Automatic Rescheduling (release_resources method)

def _fixed_release_resources(self, ts, ws):
    # Call original release_resources first
    self._original_release_resources(ts, ws)
    
    # 🚀 ENHANCEMENT: Re-check no-worker tasks after resource release
    if ts.resource_restrictions:
        no_worker_tasks = []
        released_resources = set(ts.resource_restrictions.keys())
        
        for no_worker_ts in list(self.unrunnable):
            if (no_worker_ts.state == 'no-worker' and no_worker_ts.resource_restrictions):
                task_resources = set(no_worker_ts.resource_restrictions.keys())
                if task_resources.intersection(released_resources):
                    no_worker_tasks.append(no_worker_ts)
        
        if no_worker_tasks:
            # Attempt to reschedule eligible tasks
            recommendations = {ts.key: "processing" for ts in no_worker_tasks}
            try:
                self.transitions(recommendations, stimulus_id=f"resource-released-{time.time()}")
            except Exception:
                pass

I'd appreciate feedback on better approaches:

  1. Is there an existing mechanism in Dask that should automatically reschedule no-worker tasks when resources become available?
  2. What's the proper way to trigger rescheduling without manually calling self.transitions()?
  3. Should this logic be in a scheduler plugin instead of monkey patching core methods?
  4. Are there performance or safety concerns with this approach that I should be aware of?
  5. Is there a cleaner way to hook into the resource release process?

I'm happy to test alternative solutions or help implement a proper fix if the maintainers can provide guidance on the preferred approach. Thanks!

g199209 avatar Sep 10 '25 06:09 g199209

Thanks for taking the time to dig into this. There are a bunch of known issues with the resource allocation in Dask. It's a feature that was added a long time ago but hasn't necessarily been cared for as other parts of code has changed.

Unfortunately we are low on maintenance capacity at the moment and I'm not confident there is anyone around who can guide these changes.

If we can move towards a well tested solution here and we can demonstrate that the CI is robust with these changes then I'm happy to review PRs and iterate on this, but would need a lot of confidence that there are no regressions here.

jacobtomlinson avatar Sep 10 '25 07:09 jacobtomlinson