dask-jobqueue icon indicating copy to clipboard operation
dask-jobqueue copied to clipboard

cluster.adapt() and persist not working together

Open arndt-k opened this issue 3 years ago • 10 comments

What happened: I am trying to run an adaptive Slurm Cluster on HPC. However the workers reaching the --lifetime do not get shut down, instead getting killed by Slurm at walltime, leading to the eventual program failure. In the MVE I was able to link this to dask persist, I have found no such link in my real code which does not include persist, but fails in the same way. Is this expected behaviour? If so is this documented? I have found nothing on this...

Output


distributed.core - ERROR - 'tcp://10.50.36.247:38525'
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
    handler(**merge(extra, msg))
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.50.36.247:38525'
distributed.utils - ERROR - 'tcp://10.50.36.247:38525'
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/utils.py", line 655, in log_errors
    yield
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
    await self.handle_worker(comm=comm, worker=address)
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
    handler(**merge(extra, msg))
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.50.36.247:38525'
distributed.core - ERROR - 'tcp://10.50.36.247:38525'
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 528, in handle_comm
    result = await result
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
    await self.handle_worker(comm=comm, worker=address)
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
    handler(**merge(extra, msg))
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.50.36.247:38525'
distributed.core - ERROR - 'tcp://10.50.35.139:35749'
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
    handler(**merge(extra, msg))
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.50.35.139:35749'
distributed.utils - ERROR - 'tcp://10.50.35.139:35749'
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/utils.py", line 655, in log_errors
    yield
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
    await self.handle_worker(comm=comm, worker=address)
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
    handler(**merge(extra, msg))
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.50.35.139:35749'
distributed.core - ERROR - 'tcp://10.50.35.139:35749'
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 528, in handle_comm
    result = await result
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
    await self.handle_worker(comm=comm, worker=address)
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
    handler(**merge(extra, msg))
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.50.35.139:35749'
distributed.core - ERROR - 'tcp://10.50.35.136:33745'
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
    handler(**merge(extra, msg))
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.50.35.136:33745'
distributed.utils - ERROR - 'tcp://10.50.35.136:33745'
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/utils.py", line 655, in log_errors
    yield
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
    await self.handle_worker(comm=comm, worker=address)
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
    handler(**merge(extra, msg))
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.50.35.136:33745'
distributed.core - ERROR - 'tcp://10.50.35.136:33745'
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 528, in handle_comm
    result = await result
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
    await self.handle_worker(comm=comm, worker=address)
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
    handler(**merge(extra, msg))
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.50.35.136:33745'
distributed.core - ERROR - 'tcp://10.50.36.88:35604'
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
    handler(**merge(extra, msg))
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.50.36.88:35604'
distributed.utils - ERROR - 'tcp://10.50.36.88:35604'
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/utils.py", line 655, in log_errors
    yield
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
    await self.handle_worker(comm=comm, worker=address)
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
    handler(**merge(extra, msg))
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.50.36.88:35604'
distributed.core - ERROR - 'tcp://10.50.36.88:35604'
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 528, in handle_comm
    result = await result
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
    await self.handle_worker(comm=comm, worker=address)
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
    handler(**merge(extra, msg))
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.50.36.88:35604'
Traceback (most recent call last):
  File "adapt_test.py", line 38, in <module>
    X[i, :] = v+Y[i]
  File "/work/envs/ddask/lib/python3.6/site-packages/dask/array/core.py", line 1340, in __array__
    x = self.compute()
  File "/work/envs/ddask/lib/python3.6/site-packages/dask/base.py", line 167, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/work/envs/ddask/lib/python3.6/site-packages/dask/base.py", line 452, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/client.py", line 2725, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/client.py", line 1992, in gather
    asynchronous=asynchronous,
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/client.py", line 833, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/utils.py", line 340, in sync
    raise exc.with_traceback(tb)
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/utils.py", line 324, in f
    result[0] = yield future
  File "/work/envs/ddask/lib/python3.6/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/client.py", line 1851, in _gather
    raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ("('getitem-8e1d85b31db8991ce62dda67e46d9fd3', 5)", <Worker 'tcp://10.50.35.136:33745', name: SLURMCluster-1, memory: 0, processing: 2188>)
distributed.core - ERROR - 'tcp://10.50.38.59:36950'
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
    handler(**merge(extra, msg))
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.50.38.59:36950'
distributed.utils - ERROR - 'tcp://10.50.38.59:36950'
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/utils.py", line 655, in log_errors
    yield
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
    await self.handle_worker(comm=comm, worker=address)
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
    handler(**merge(extra, msg))
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.50.38.59:36950'
distributed.core - ERROR - 'tcp://10.50.39.55:36267'
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
    handler(**merge(extra, msg))
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.50.39.55:36267'
distributed.utils - ERROR - 'tcp://10.50.39.55:36267'
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/utils.py", line 655, in log_errors
    yield
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
    await self.handle_worker(comm=comm, worker=address)
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
    handler(**merge(extra, msg))
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.50.39.55:36267'
distributed.core - ERROR - 'tcp://10.50.38.59:36950'
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 528, in handle_comm
    result = await result
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
    await self.handle_worker(comm=comm, worker=address)
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
    handler(**merge(extra, msg))
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.50.38.59:36950'
distributed.core - ERROR - 'tcp://10.50.39.55:36267'
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 528, in handle_comm
    result = await result
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
    await self.handle_worker(comm=comm, worker=address)
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
    handler(**merge(extra, msg))
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.50.39.55:36267'
distributed.core - ERROR - 'tcp://10.50.35.137:42630'
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
    handler(**merge(extra, msg))
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.50.35.137:42630'
distributed.utils - ERROR - 'tcp://10.50.35.137:42630'
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/utils.py", line 655, in log_errors
    yield
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
    await self.handle_worker(comm=comm, worker=address)
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
    handler(**merge(extra, msg))
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.50.35.137:42630'
distributed.core - ERROR - 'tcp://10.50.35.135:45181'
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
    handler(**merge(extra, msg))
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.50.35.135:45181'
distributed.utils - ERROR - 'tcp://10.50.35.135:45181'
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/utils.py", line 655, in log_errors
    yield
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
    await self.handle_worker(comm=comm, worker=address)
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
    handler(**merge(extra, msg))
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.50.35.135:45181'
distributed.core - ERROR - 'tcp://10.50.35.137:42630'
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 528, in handle_comm
    result = await result
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
    await self.handle_worker(comm=comm, worker=address)
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
    handler(**merge(extra, msg))
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.50.35.137:42630'
distributed.core - ERROR - 'tcp://10.50.35.135:45181'
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 528, in handle_comm
    result = await result
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
    await self.handle_worker(comm=comm, worker=address)
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
    handler(**merge(extra, msg))
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.50.35.135:45181'
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/comm/core.py", line 288, in connect
    timeout=min(intermediate_cap, time_left()),
  File "/work/envs/ddask/lib/python3.6/asyncio/tasks.py", line 362, in wait_for
    raise futures.TimeoutError()
concurrent.futures._base.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/deploy/spec.py", line 641, in close_clusters
    cluster.close(timeout=10)
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/deploy/cluster.py", line 104, in close
    return self.sync(self._close, callback_timeout=timeout)
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/deploy/cluster.py", line 183, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/utils.py", line 340, in sync
    raise exc.with_traceback(tb)
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/utils.py", line 324, in f
    result[0] = yield future
  File "/work/envs/ddask/lib/python3.6/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/deploy/spec.py", line 407, in _close
    await self.scheduler_comm.close(close_workers=True)
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 810, in send_recv_from_rpc
    comm = await self.live_comm()
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 772, in live_comm
    **self.connection_args,
  File "/work/envs/ddask/lib/python3.6/site-packages/distributed/comm/core.py", line 310, in connect
    ) from active_exception
OSError: Timed out trying to connect to tcp://10.50.32.38:38193 after 10 s

Error log of one of the killed workers:


distributed.nanny - INFO -         Start Nanny at: 'tcp://10.50.35.139:46695'
distributed.worker - INFO -       Start worker at:   tcp://10.50.35.139:39474
distributed.worker - INFO -          Listening to:   tcp://10.50.35.139:39474
distributed.worker - INFO -          dashboard at:         10.50.35.139:46304
distributed.worker - INFO - Waiting to connect to:    tcp://10.50.32.38:38193
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                   40.00 GB
distributed.worker - INFO -       Local Directory: /scratch/b/x/dask-worker-space/worker-r7sc67zm
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:    tcp://10.50.32.38:38193
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Closing worker gracefully: tcp://10.50.35.139:39474
distributed.worker - INFO - Comm closed
distributed.worker - ERROR - failed during get data with tcp://10.50.35.139:39474 -> tcp://10.50.35.199:37618
Traceback (most recent call last):
  File "/work/xenvs/ddask/lib/python3.6/site-packages/tornado/iostream.py", line 971, in _handle_write
    num_bytes = self.write_to_fd(self._write_buffer.peek(size))
  File "/work/xenvs/ddask/lib/python3.6/site-packages/tornado/iostream.py", line 1148, in write_to_fd
    return self.socket.send(data)  # type: ignore
BrokenPipeError: [Errno 32] Broken pipe

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/work/xenvs/ddask/lib/python3.6/site-packages/distributed/worker.py", line 1281, in get_data
    compressed = await comm.write(msg, serializers=serializers)
  File "/work/xenvs/ddask/lib/python3.6/site-packages/distributed/comm/tcp.py", line 257, in write
    convert_stream_closed_error(self, e)
  File "/work/xenvs/ddask/lib/python3.6/site-packages/distributed/comm/tcp.py", line 124, in convert_stream_closed_error
    ) from exc
distributed.comm.core.CommClosedError: in <closed TCP>: BrokenPipeError: [Errno 32] Broken pipe
distributed.core - INFO - Lost connection to 'tcp://10.50.35.199:39418': in <closed TCP>: BrokenPipeError: [Errno 32] Broken pipe
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x2ab39158cda0>>, <Task finished coro=<Worker.close_gracefully() done, defined at /work/xenvs/ddask/lib/python3.6/site-packages/distributed/worker.py:1179> exception=CommClosedError('in <closed TCP>: Stream is closed',)>)
Traceback (most recent call last):
  File "/work/xenvs/ddask/lib/python3.6/site-packages/distributed/comm/tcp.py", line 187, in read
    n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/work/xenvs/ddask/lib/python3.6/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/work/xenvs/ddask/lib/python3.6/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/work/xenvs/ddask/lib/python3.6/site-packages/distributed/worker.py", line 1193, in close_gracefully
    await self.scheduler.retire_workers(workers=[self.address], remove=False)
  File "/work/xenvs/ddask/lib/python3.6/site-packages/distributed/core.py", line 883, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/work/xenvs/ddask/lib/python3.6/site-packages/distributed/core.py", line 666, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/work/xenvs/ddask/lib/python3.6/site-packages/distributed/comm/tcp.py", line 202, in read
    convert_stream_closed_error(self, e)
  File "/work/xenvs/ddask/lib/python3.6/site-packages/distributed/comm/tcp.py", line 126, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc)) from exc
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed
slurmstepd: error: *** JOB 32249611 ON m CANCELLED AT 2021-09-09T10:46:05 DUE TO TIME LIMIT ***
distributed.dask_worker - INFO - Exiting on signal 15
distributed.nanny - INFO - Closing Nanny at 'tcp://10.50.35.139:46695'
distributed.nanny - INFO - Worker process 37778 was killed by signal 15
distributed.dask_worker - INFO - End worker

What you expected to happen: I expect the workers reaching the specified lifetime getting shut down in a matter of seconds and then getting restarted, no matter what my dask code does, really. Minimal Complete Verifiable Example:


#most of the code is from some other issue on here
import numpy as np
from dask_jobqueue import SLURMCluster as Cluster
import time
import dask.array as da
from dask.distributed import Client, as_completed


cluster = Cluster(walltime='00:02:00',
                       interface="ib0",
                       cores=1, memory='40gb',extra=["--lifetime", "50s", 
                                                    "--lifetime-stagger", "5s"])
cluster.adapt(minimum=4, maximum=10) 


client = Client(cluster)

# each job takes 1s, and we have 4 cpus * 1 min * 60s/min = 240 cpu.s, let's ask for a little more tasks.
filenames = [f'img{num}.jpg' for num in range(48000)]

def features(num_fn):
    num, image_fn = num_fn
    time.sleep(1)  # takes about 1s to compute features on an image
    features = np.random.random(24600)
    return num, features

num_files = len(filenames)
num_features = len(features((0, filenames[0]))[1]) 

X = np.zeros((num_files, num_features), dtype=np.float32)
Y=da.random.random(X.shape)
Y=Y.persist() #comment out this line and everything works as expected

for future in as_completed(client.map(features, list(enumerate(filenames)))): 
    i, v = future.result()
    X[i, :] = v+Y[i]

Anything else we need to know?: I contacted HPC IT and they are not aware of any tcp issues/requirements Environment:

  • Dask version: dask 2.25.0 py_0 conda-forge dask-core 2.25.0 py_0 conda-forge dask-jobqueue 0.7.3 pyhd8ed1ab_0 conda-forge distributed 2.30.1 py36h06a4308_0
  • Python version: 3.6.0
  • Operating System: RHEL 6.10
  • Install method (conda, pip, source): conda 4.10.3

arndt-k avatar Sep 09 '21 10:09 arndt-k

Thanks @Wacken0013 for the detailed issue, and sorry not to get back at you earlier.

I did not have the time to try your example yet to figure out what may be going wrong.

At first glance it looks like your worker fails to send the data it is owning before shutting down, which prevents the process to stop. And I confirm this is not the expected behavior.

First of all, I see the versions of Dask you're using (and Python version) are getting quite old, is it possible for you to install a more up to date version and see if you reproduce the problem?

Also not related, but I see you seem to be using a shared file system for your workers local directory (/scratch/b/x/dask-worker-space/worker-r7sc67zm), which is not recommended. Could you use something like /tmp or '$TMPDIR' if available?

guillaumeeb avatar Oct 09 '21 15:10 guillaumeeb

Thanks for the reply @guillaumeeb. I probably wont be able to upgrade the versions significantly, due to the old OS. Can you explain why using scratch is an issue? I can definitely change that, maybe it is also the source of my other dask problems :D

arndt-k avatar Oct 10 '21 09:10 arndt-k

As @guillaumeeb mentioned that's a reasonably old Dask version and we no longer support Python 3.6.

However I am wondering if this is a race issue between creating new workers, transferring state and the walltime.

I see your walltime is two minutes and lifetime is 50 seconds. That leaves 70 seconds to schedule a new worker job, have it fulfilled, connect to the scheduler and migrate the state.

Are you seeing new jobs being created to replace the old ones? And are you seeing them connect to the scheduler in the scheduler log?

jacobtomlinson avatar Oct 12 '21 15:10 jacobtomlinson

Can you explain why using scratch is an issue?

In my experience (but I've to admit I'm not really sure of the use of each worker local directory, and this may have evolved across time), specifying a shared directory as the base directory of workers local directory can lead to bad behavior of workers. Workers compete for writing in this shared directory and sometimes fail.

I probably wont be able to upgrade the versions significantly, due to the old OS.

Yep, I did notice the old OS versions, but I didn't think it would prevent using something like Conda to upgrade to more up to date versions.

I see your walltime is two minutes and lifetime is 50 seconds. That leaves 70 seconds to schedule a new worker job, have it fulfilled, connect to the scheduler and migrate the state.

But @jacobtomlinson do we agree that this is not normal the worker stays kind of alive for these 70 seconds? Ideally, it should migrate the state to other workers currently in the cluster? I really have to try the example...

guillaumeeb avatar Oct 13 '21 18:10 guillaumeeb

The worker local directory should be placed on the fastest temporary storage possible. Shared storage is really not a good place for this.

Without digging deeper into the example I think it should be migrating to other workers, but perhaps due to memory, network or storage limitations this is taking longer than 70 seconds?

jacobtomlinson avatar Oct 14 '21 12:10 jacobtomlinson

Yep, I did notice the old OS versions, but I didn't think it would prevent using something like Conda to upgrade to more up to date versions.

No, its a big issue on the machine. I got an environment based on python 3.7 working now but I have not had the time to test this specific issue again.

Are you seeing new jobs being created to replace the old ones? And are you seeing them connect to the scheduler in the scheduler log?

I have not checked the logs, but I didnt see new jobs being created in slurm if I recall correctly. 70s should be enough by alot. I did try with hours too, didnt change anything. And really the question is why the migration should take prohibitively longer because of persist. I'll run the MVE again on the new env soon and update here

arndt-k avatar Oct 14 '21 15:10 arndt-k

And really the question is why the migration should take prohibitively longer because of persist.

The migration will take longer because the worker has all the data persisted in memory and it needs to send it to other workers. But I agree it shouldn't be prohibitively longer.

jacobtomlinson avatar Oct 15 '21 10:10 jacobtomlinson

I tried the MVE again in the new environment. Workers still get killed. However I do see the creation of new workers when the old ones reach the 40s lifetime so I probably just remembered wrong. But the old ones last until the slurm lifetime and get killed. The new environment uses dask 2021.4.1 pyhd8ed1ab_0 conda-forge dask-core 2021.4.1 pyhd8ed1ab_0 conda-forge dask-jobqueue 0.7.3 pyhd8ed1ab_0 conda-forge

arndt-k avatar Oct 25 '21 15:10 arndt-k

Stupid question: I also had a lot of trouble with adapt, so I would love to add a (failing) test to the github CI to document this. Can the minimal example (with pytest.mark.xfail) be rewritten without using numpy and made a PR (because this would require extra dependencies) using normal arrays? Would be great to have a test that requires migration.

riedel avatar Nov 27 '21 19:11 riedel

some info, I used

walltime: '00:30:00'
extra: ["--lifetime", "25m", "--lifetime-stagger", "1m"],

in SLURMCluster to avoid workers being killed at walltime

alisterburt avatar Jan 31 '22 14:01 alisterburt

Hi @arndt-k,

I finally tried your MCVE, with 6 months old Dask versions:

dask                      2022.2.0
dask-jobqueue             0.7.3
distributed               2022.2.0
python                    3.9.104

On a RHEL 7 system with PBS.

I tried it several times, and never get an error. The adaptive mechanism works as expected.

I tried with only the Y.persist(), to check that the in memory data was well sent back and forth to new workers, and then with the client.map and as_completed part. Workflow was running smoothly, even if it did advance slowly, especially with the really large filenames range (48000!!). At one point, (2000 filenames processed), I just cancel it to avoid saturating the PBS Server with new jobs every minutes.

I relaunched it with 800 filenames, and also modified the Cluster constructor kwargs (more cores, less memory per worker), and it finished without a problem, after stopping and starting 3 workers 'batch'.

Here is the final version of the code:

import numpy as np
from dask_jobqueue import PBSCluster as Cluster
import time
import dask.array as da
from dask.distributed import Client, as_completed

cluster = Cluster(walltime='00:02:00',
                       interface="ib0",
                       cores=4, memory='20GiB',extra=["--lifetime", "50s", 
                                                    "--lifetime-stagger", "5s"])
cluster.adapt(minimum=4, maximum=10) 

client = Client(cluster)

# each job takes 1s, and we have 4 cpus * 1 min * 60s/min = 240 cpu.s, let's ask for a little more tasks.
filenames = [f'img{num}.jpg' for num in range(600)]

def features(num_fn):
    num, image_fn = num_fn
    time.sleep(1)  # takes about 1s to compute features on an image
    features = np.random.random(24600)
    return num, features

num_files = len(filenames)
num_features = len(features((0, filenames[0]))[1]) 

X = np.zeros((num_files, num_features), dtype=np.float32)
Y=da.random.random(X.shape)

Y=Y.persist()

for future in as_completed(client.map(features, list(enumerate(filenames)))): 
    i, v = future.result()
    X[i, :] = v+Y[i]

So I'm going to close this issue as the adaptive logic seems to be working in your case with a more recent Dask, Python and OS version. Feel free to add more feedback here and even reopen if you find it does not.

guillaumeeb avatar Sep 06 '22 08:09 guillaumeeb