Allow min worker queue length of 0
My use case for Dask is primarily for orchestrating the execution of long running functions. By long running, I mean minutes, and sometimes hours. Tasks can be submitted to a cluster in waves, eventually saturating all workers, and causing some shorter running tasks to get queued behind very long-running tasks. Worker stealing is moot given all workers are saturated.
I would like to prevent eager assignment of ready tasks to worker queues, allowing tasks to build up on the scheduler. Currently, the minimum worker queue achievable is 1 (i.e, via a worker-saturation setting <= 1.0)). This appears to be controlled via distributed.scheduler._task_slots_available():
def _task_slots_available(ws: WorkerState, saturation_factor: float) -> int:
"""Number of tasks that can be sent to this worker without oversaturating it"""
assert not math.isinf(saturation_factor)
return max(math.ceil(saturation_factor * ws.nthreads), 1) - (
len(ws.processing) - len(ws.long_running)
)
All I ask is to expose a setting to allow a floor of 0 here. Thanks!
A small word of caution this isn't a common use case for Dask. Scheduling minute/hour scale tasks may not behave the way you expect (as you're already experiencing). You may want to explore tools like Prefect for this instead.
That being said if you want to use Dask for this then I'm sure we can round off some rough edges.
Your request is basically to block workers from taking tasks and then opening that gate at some point in the future on the fly is that right? Have you tried experimenting with this already? I feel like tasks will still get picked up in a FIFO way and small tasks will be queues behind big ones.
I'm curios what kind of workload this is ? Can you tell us more about your usecase ?
I don't think my earlier suggestion of setting that floor in _task_slots_available() would effectively disable the worker queue as I'd like, but I'm guessing you know that. :)
A small word of caution this isn't a common use case for Dask. Scheduling minute/hour scale tasks may not behave the way you expect (as you're already experiencing). You may want to explore tools like Prefect for this instead.
I'm curios what kind of workload this is ? Can you tell us more about your usecase ?
We can't move away from Dask at the moment for reasons I'm not at liberty to detail in a public setting. The use case is financial markets, where we are building large task graphs, and using Dask in a distributed cluster. Some of our tasks submit other tasks in turn, and multiple such parent tasks may be encountered during a graph. Hence the waves of submitted futures saturating the cluster.
We're well aware this is not the intended use of Dask.
However, given:
- Disabling scheduler-side queuing is supported (
worker-saturation = inf). - Worker-side queuing is configurable to a degree, but floored at length
1(worker-saturation > 0). - Lower level concepts like worker-stealing, FIFO timeouts, etc. are configurable.
... I would say a high level concept like disabling worker queuing should be configurable.
Your request is basically to block workers from taking tasks and then opening that gate at some point in the future on the fly is that right? Have you tried experimenting with this already? I feel like tasks will still get picked up in a FIFO way and small tasks will be queues behind big ones.
I don't want to open the gate, so to speak, manually. I want the scheduler to hoard tasks that are ready, and only send them to a worker when that worker is truly free to process the task (notwithstanding seceded threads).
Thanks!