distributed
distributed copied to clipboard
Client.map() keys as a list breaks Dask queue system
Describe the issue: This could not be a bug, but if not then it is unclear to me from the docs why dask behaves like this. When passing a list of values to client.map() to use as keys, the queueing system seems to break.
In the reproducible example:
- If keys is empty or a constant string and I submit 1000 simple jobs, 282 move to processing (i have 282 cores), the rest are queued.
- If keys are a list (length matching the input iterables), they all instantly move to processing.
Minimal Complete Verifiable Example:
from distributed import Client, progress
def simple_task_wait_60s(id: int, logger=None):
from time import sleep
sleep(10)
return 0
if __name__ == '__main__':
n = 15000
iterabs = [str(i) for i in range(1000)]
c = Client(address='tcp://xxx.xxx.xxx.xxx:8786')
data = c.map(simple_task_wait_60s,
iterabs,
pure=False,
# key=iterabs
)
progress(data)
results = c.gather(data)
The above produces the following output on the dashboard. Notice the number of jobs queued vs. processing:
Uncommenting the line key=iterabs
produces the following instead. Notice queued=0:
Anything else we need to know?:
Environment:
- Dask version: dask=2024.5.2, distributed=2024.5.2
- Python version: 3.10
- Operating System: jobs submitted from macOS 14.5, scheduler and worker running on Ubuntu 22.04.3 LTS
- Install method (conda, pip, source): pip (poetry)