Resource scheduling bug: Scheduler allows over-allocation by ignoring worker's current resource usage
Describe the issue:
Dask scheduler incorrectly allows resource over-allocation when scheduling tasks with custom resource requirements. The scheduler's valid_workers() method only checks if a worker's total declared resources meet the task requirements, but ignores the worker's currently used resources. This leads to severe resource over-allocation (up to 450% in our tests) and inconsistent resource accounting between scheduler and worker.
The worker layer correctly enforces resource limits, but this creates a discrepancy where the scheduler reports tasks as "processing" while they're actually queued due to resource constraints.
Minimal Complete Verifiable Example:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import dask
from distributed import LocalCluster, Client
def simple_task(task_id, duration=15):
"""Simple task that requires MEM resources"""
print(f"Task {task_id} starting...")
time.sleep(duration)
print(f"Task {task_id} completed!")
return f"Result from task {task_id}"
def test_resource_bug():
print("=" * 60)
print("DASK RESOURCE SCHEDULING BUG REPRODUCTION")
print("=" * 60)
print(f"Dask version: {dask.__version__}")
# Create a local cluster with one worker having limited MEM resources
cluster = LocalCluster(
n_workers=1,
threads_per_worker=20, # High thread count to avoid thread limit interference
memory_limit=None, # Disable memory limit to focus on custom resources
resources={"MEM": 10}, # Worker declares 10 MEM resources
dashboard_address=None,
silence_logs=False
)
client = Client(cluster)
try:
print(f"\nCluster Info:")
print(f"Workers: {len(client.scheduler_info()['workers'])}")
# Helper function to get resource info from scheduler
def get_worker_resources(dask_scheduler):
scheduler = dask_scheduler
worker_info = {}
for addr, worker in scheduler.workers.items():
worker_info[addr] = {
'declared_resources': dict(worker.resources),
'used_resources': dict(worker.used_resources),
'processing_count': len(worker.processing)
}
return worker_info
# Check initial resources
print(f"\nInitial Worker Resources:")
initial_resources = client.run_on_scheduler(get_worker_resources)
for addr, info in initial_resources.items():
print(f" Worker: declared_MEM={info['declared_resources'].get('MEM', 0)}, "
f"used_MEM={info['used_resources'].get('MEM', 0)}, "
f"tasks={info['processing_count']}")
print("\nSubmitting 15 tasks, each requiring 3 MEM resources...")
print(" Worker declares: 10 MEM")
print(" Total requirement: 15 * 3 = 45 MEM")
print(" Expected (strict): Only 3 tasks should run (3*3=9 <= 10 MEM)")
print(" Let's see how many actually run simultaneously...")
print(" Worker has 20 threads, so thread count is NOT a limiting factor")
# Submit 15 tasks, each requiring 3 MEM resources
# Total requirement: 15 * 3 = 45 MEM
# Available: 10 MEM
# Let's see how many actually run
futures = []
for i in range(1, 16):
future = client.submit(
simple_task,
task_id=i,
duration=15, # Long enough to observe resource allocation
resources={'MEM': 3},
pure=False
)
futures.append(future)
print(f" Submitted task {i} (requires 3 MEM)")
# Wait for tasks to start executing
print("\nWaiting for tasks to start...")
# Wait until at least some tasks are running
max_wait = 10
wait_time = 0
while wait_time < max_wait:
current_check = client.run_on_scheduler(get_worker_resources)
running_tasks = sum(info['processing_count'] for info in current_check.values())
if running_tasks > 0:
print(f"Tasks started! {running_tasks} tasks now running.")
break
time.sleep(0.5)
wait_time += 0.5
if wait_time >= max_wait:
print("Warning: Tasks may not have started yet")
# Check resource usage after submission
print(f"\nResource Usage After Task Submission:")
current_resources = client.run_on_scheduler(get_worker_resources)
for addr, info in current_resources.items():
declared = info['declared_resources'].get('MEM', 0)
used = info['used_resources'].get('MEM', 0)
tasks = info['processing_count']
overallocation = (used / declared * 100) if declared > 0 else 0
print(f" Worker: declared_MEM={declared}, used_MEM={used}, processing_tasks={tasks}")
print(f" Resource utilization: {overallocation:.1f}%")
# Now with 20 threads, we should see all 15 tasks actually running if there's no resource limit
print(f" NOTE: Worker has 20 threads, so thread limit should NOT be a constraint")
if used > declared:
print(f" *** RESOURCE ACCOUNTING BUG DETECTED!")
print(f" Scheduler reserved {used} MEM for {tasks} tasks")
print(f" but worker only declared {declared} MEM resources!")
print(f" This shows scheduler doesn't check available resources!")
print(f" Overallocation: {used - declared} MEM over limit")
elif tasks < 15:
print(f" PARTIAL SCHEDULING: {tasks} tasks running, {15-tasks} waiting")
else:
print(f" FULL OVERCOMMIT: All {tasks} tasks scheduled!")
# Count how many tasks are actually running by checking logs
# We'll compare this with scheduler's processing count
# Check task statuses
print(f"\nTask Statuses:")
for i, future in enumerate(futures, 1):
print(f" Task {i}: {future.status}")
# Cancel tasks to complete demonstration
print(f"\nCancelling tasks to complete demonstration...")
for future in futures:
future.cancel()
time.sleep(2) # Wait for cancellation to complete
# Final resource check
print(f"\nFinal Resource Usage (after cancellation):")
final_resources = client.run_on_scheduler(get_worker_resources)
for addr, info in final_resources.items():
print(f" Worker: declared_MEM={info['declared_resources'].get('MEM', 0)}, "
f"used_MEM={info['used_resources'].get('MEM', 0)}, "
f"tasks={info['processing_count']}")
finally:
print(f"\nClosing cluster...")
client.close()
cluster.close()
print(f"\n" + "=" * 60)
print("BUG REPORT SUMMARY:")
print("- Issue: Dask scheduler doesn't check current resource usage")
print("- Location: distributed/scheduler.py, valid_workers() method")
print("- Line ~3279-3281: if supplied >= required:")
print("- Fix needed: if (supplied - used_resources) >= required:")
print("- Version affected: dask==2025.5.1 (possibly others)")
print("=" * 60)
if __name__ == "__main__":
test_resource_bug()
Expected Output (showing the bug):
Resource Usage After Task Submission:
Worker: declared_MEM=10, used_MEM=45, processing_tasks=15
Resource utilization: 450.0%
*** RESOURCE ACCOUNTING BUG DETECTED!
Scheduler reserved 45 MEM for 15 tasks
but worker only declared 10 MEM resources!
Overallocation: 35 MEM over limit
# But in the logs, only 3 tasks actually start:
Task 1 starting...
Task 2 starting...
Task 3 starting...
Root Cause Analysis:
This is a dual-layer architecture problem with inconsistent resource management between scheduler and worker:
1. Scheduler Layer Bug (distributed/scheduler.py:3279-3281)
The valid_workers() method in the scheduler has a critical flaw:
# Current buggy code
for addr, supplied in dr.items():
if supplied >= required: # 🚨 Only checks total declared, not available
sw.add(addr)
Should be fixed to:
for addr, supplied in dr.items():
ws = self.workers[addr]
available = supplied - ws.used_resources.get(resource, 0)
if available >= required:
sw.add(addr)
2. Worker Layer (Working Correctly)
The worker correctly enforces resource limits via distributed/worker_state_machine.py:
def _resource_restrictions_satisfied(self, ts: TaskState) -> bool:
if not ts.resource_restrictions:
return True
return all(
self.available_resources[resource] >= needed
for resource, needed in ts.resource_restrictions.items()
)
3. The Discrepancy
- Scheduler says: 15 tasks processing, 45 MEM used (450% over-allocation)
- Worker reality: Only 3 tasks executing, 9 MEM actually used
- Result: Resource accounting is completely unreliable
Anything else we need to know?:
This bug has significant implications:
- Resource accounting is unreliable:
used_resourcesshows false data - Monitoring tools report incorrect resource utilization
- Scheduler makes poor decisions based on wrong resource state
- Can lead to resource starvation or inefficient scheduling
The worker's protection mechanism prevents catastrophic over-allocation, but the scheduler's incorrect accounting creates system inconsistency.
Environment:
- Dask version: 2025.5.1
- Python version: 3.12
- Operating System: Rocky Linux 8.8
- Install method (conda, pip, source): pip
I've implemented a working monkey patch for this resource scheduling bug that works in my specific use case, but I'd like to share it here for discussion and get feedback on whether there are better approaches.
Issue Analysis
After investigating the bug in distributed/scheduler.py:3279-3281, I found that fixing only the valid_workers() method is insufficient to resolve the complete resource scheduling issue.
1. The Original Bug
The valid_workers() method only checked total declared resources:
# Buggy code in distributed/scheduler.py:3279-3281
for addr, supplied in dr.items():
if supplied >= required: # Only checks total, ignores used resources
sw.add(addr)
2. The Incomplete Fix
Simply fixing the resource check prevents over-allocation but creates a new problem:
# Fixed resource check
available = supplied - used
if available >= required:
sw.add(addr)
This fix correctly prevents resource over-allocation, but tasks that are rejected get stuck in no-worker state indefinitely.
3. The Missing Piece: Task Rescheduling
Once tasks are in no-worker state, Dask does not automatically reschedule them when resources become available. This leads to:
- First batch of tasks executes correctly
- Remaining tasks get stuck in
no-workerstate forever - Resources are released but never utilized again
My Monkey Patch Solution (Working but may not be perfect)
Part 1: Fix Resource Checking (valid_workers method)
def _fixed_valid_workers(self, ts):
# ... (worker_restrictions and host_restrictions unchanged)
if ts.resource_restrictions:
for resource, required in ts.resource_restrictions.items():
# ... (setup code unchanged)
for addr, supplied in dr.items():
# 🚨 FIX: Check available resources, not just total declared
ws = self.workers.get(addr)
if ws is not None:
used = ws.used_resources.get(resource, 0)
available = supplied - used # KEY FIX
if available >= required:
sw.add(addr)
else:
# Fallback to original behavior if worker not found
if supplied >= required:
sw.add(addr)
# ... (rest unchanged)
Part 2: Add Automatic Rescheduling (release_resources method)
def _fixed_release_resources(self, ts, ws):
# Call original release_resources first
self._original_release_resources(ts, ws)
# 🚀 ENHANCEMENT: Re-check no-worker tasks after resource release
if ts.resource_restrictions:
no_worker_tasks = []
released_resources = set(ts.resource_restrictions.keys())
for no_worker_ts in list(self.unrunnable):
if (no_worker_ts.state == 'no-worker' and no_worker_ts.resource_restrictions):
task_resources = set(no_worker_ts.resource_restrictions.keys())
if task_resources.intersection(released_resources):
no_worker_tasks.append(no_worker_ts)
if no_worker_tasks:
# Attempt to reschedule eligible tasks
recommendations = {ts.key: "processing" for ts in no_worker_tasks}
try:
self.transitions(recommendations, stimulus_id=f"resource-released-{time.time()}")
except Exception:
pass
I'd appreciate feedback on better approaches:
- Is there an existing mechanism in Dask that should automatically reschedule
no-workertasks when resources become available? - What's the proper way to trigger rescheduling without manually calling
self.transitions()? - Should this logic be in a scheduler plugin instead of monkey patching core methods?
- Are there performance or safety concerns with this approach that I should be aware of?
- Is there a cleaner way to hook into the resource release process?
I'm happy to test alternative solutions or help implement a proper fix if the maintainers can provide guidance on the preferred approach. Thanks!
Thanks for taking the time to dig into this. There are a bunch of known issues with the resource allocation in Dask. It's a feature that was added a long time ago but hasn't necessarily been cared for as other parts of code has changed.
Unfortunately we are low on maintenance capacity at the moment and I'm not confident there is anyone around who can guide these changes.
If we can move towards a well tested solution here and we can demonstrate that the CI is robust with these changes then I'm happy to review PRs and iterate on this, but would need a lot of confidence that there are no regressions here.