distributed icon indicating copy to clipboard operation
distributed copied to clipboard

Client.map() keys as a list breaks Dask queue system

Open arneyjfs opened this issue 8 months ago • 6 comments

Describe the issue: This could not be a bug, but if not then it is unclear to me from the docs why dask behaves like this. When passing a list of values to client.map() to use as keys, the queueing system seems to break.

In the reproducible example:

  • If keys is empty or a constant string and I submit 1000 simple jobs, 282 move to processing (i have 282 cores), the rest are queued.
  • If keys are a list (length matching the input iterables), they all instantly move to processing.

Minimal Complete Verifiable Example:

from distributed import Client, progress


def simple_task_wait_60s(id: int, logger=None):
    from time import sleep
    sleep(10)
    return 0


if __name__ == '__main__':
    n = 15000
    iterabs = [str(i) for i in range(1000)]

    c = Client(address='tcp://xxx.xxx.xxx.xxx:8786')

    data = c.map(simple_task_wait_60s,
                 iterabs,
                 pure=False,
                 # key=iterabs
                 )
    progress(data)
    results = c.gather(data)

The above produces the following output on the dashboard. Notice the number of jobs queued vs. processing: image

Uncommenting the line key=iterabs produces the following instead. Notice queued=0: image

Anything else we need to know?:

Environment:

  • Dask version: dask=2024.5.2, distributed=2024.5.2
  • Python version: 3.10
  • Operating System: jobs submitted from macOS 14.5, scheduler and worker running on Ubuntu 22.04.3 LTS
  • Install method (conda, pip, source): pip (poetry)

arneyjfs avatar Jun 05 '24 13:06 arneyjfs