distributed
distributed copied to clipboard
Worker restriction using alias doesn't resolve after worker restart
Describe the issue:
Worker restrictions when specified using the worker name can fail to be resolved if the named worker is restarted and allow_other_workers = False
. I think this is because the worker_restrictions
in TaskState
records the output of Scheduler.coerce_address
, which is most likely not the same when the worker is restarted. As a result, the worker restriction is never fulfilled even though the restarted worker keeps the same name. Is this the intended behaviour?
Minimal Complete Verifiable Example:
import time
def test_func(duration: int):
time.sleep(duration)
def main():
from dask.distributed import Scheduler, Nanny, Client, SpecCluster, as_completed
scheduler = {'cls': Scheduler, 'options': {"dashboard_address": ':8787'}}
workers = {
'worker-0': {"cls": Nanny, "options": {"nthreads": 1}},
}
cluster = SpecCluster(scheduler=scheduler, workers=workers)
client = Client(cluster)
future = client.submit(test_func, 60, workers="worker-0", allow_other_workers=False)
time.sleep(10) # to ensure task is submitted
for key, task in cluster.scheduler.tasks.items():
print("BEFORE RESTART", key, task.worker_restrictions, task.host_restrictions)
client.restart_workers(["worker-0"])
for _ in as_completed([future]):
pass
if __name__ == "__main__":
main()
The output I get from the above example shows the scheduler and worker starting. The initial execution of thetest_func
task does not complete before the worker is restarted, after which the program hangs because the worker restriction is never fulfilled. The output of the print statement shows that the worker restriction in the task state records the address of the initial worker.
Environment:
- Dask version: 2024.2.0
- Python version: 3.10
- Operating System: Linux
- Install method (conda, pip, source): pip