dask-jobqueue
dask-jobqueue copied to clipboard
cluster.adapt() and persist not working together
What happened: I am trying to run an adaptive Slurm Cluster on HPC. However the workers reaching the --lifetime do not get shut down, instead getting killed by Slurm at walltime, leading to the eventual program failure. In the MVE I was able to link this to dask persist, I have found no such link in my real code which does not include persist, but fails in the same way. Is this expected behaviour? If so is this documented? I have found nothing on this...
Output
distributed.core - ERROR - 'tcp://10.50.36.247:38525'
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
handler(**merge(extra, msg))
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.50.36.247:38525'
distributed.utils - ERROR - 'tcp://10.50.36.247:38525'
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/utils.py", line 655, in log_errors
yield
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
await self.handle_worker(comm=comm, worker=address)
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
await self.handle_stream(comm=comm, extra={"worker": worker})
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
handler(**merge(extra, msg))
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.50.36.247:38525'
distributed.core - ERROR - 'tcp://10.50.36.247:38525'
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 528, in handle_comm
result = await result
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
await self.handle_worker(comm=comm, worker=address)
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
await self.handle_stream(comm=comm, extra={"worker": worker})
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
handler(**merge(extra, msg))
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.50.36.247:38525'
distributed.core - ERROR - 'tcp://10.50.35.139:35749'
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
handler(**merge(extra, msg))
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.50.35.139:35749'
distributed.utils - ERROR - 'tcp://10.50.35.139:35749'
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/utils.py", line 655, in log_errors
yield
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
await self.handle_worker(comm=comm, worker=address)
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
await self.handle_stream(comm=comm, extra={"worker": worker})
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
handler(**merge(extra, msg))
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.50.35.139:35749'
distributed.core - ERROR - 'tcp://10.50.35.139:35749'
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 528, in handle_comm
result = await result
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
await self.handle_worker(comm=comm, worker=address)
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
await self.handle_stream(comm=comm, extra={"worker": worker})
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
handler(**merge(extra, msg))
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.50.35.139:35749'
distributed.core - ERROR - 'tcp://10.50.35.136:33745'
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
handler(**merge(extra, msg))
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.50.35.136:33745'
distributed.utils - ERROR - 'tcp://10.50.35.136:33745'
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/utils.py", line 655, in log_errors
yield
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
await self.handle_worker(comm=comm, worker=address)
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
await self.handle_stream(comm=comm, extra={"worker": worker})
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
handler(**merge(extra, msg))
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.50.35.136:33745'
distributed.core - ERROR - 'tcp://10.50.35.136:33745'
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 528, in handle_comm
result = await result
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
await self.handle_worker(comm=comm, worker=address)
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
await self.handle_stream(comm=comm, extra={"worker": worker})
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
handler(**merge(extra, msg))
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.50.35.136:33745'
distributed.core - ERROR - 'tcp://10.50.36.88:35604'
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
handler(**merge(extra, msg))
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.50.36.88:35604'
distributed.utils - ERROR - 'tcp://10.50.36.88:35604'
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/utils.py", line 655, in log_errors
yield
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
await self.handle_worker(comm=comm, worker=address)
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
await self.handle_stream(comm=comm, extra={"worker": worker})
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
handler(**merge(extra, msg))
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.50.36.88:35604'
distributed.core - ERROR - 'tcp://10.50.36.88:35604'
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 528, in handle_comm
result = await result
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
await self.handle_worker(comm=comm, worker=address)
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
await self.handle_stream(comm=comm, extra={"worker": worker})
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
handler(**merge(extra, msg))
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.50.36.88:35604'
Traceback (most recent call last):
File "adapt_test.py", line 38, in <module>
X[i, :] = v+Y[i]
File "/work/envs/ddask/lib/python3.6/site-packages/dask/array/core.py", line 1340, in __array__
x = self.compute()
File "/work/envs/ddask/lib/python3.6/site-packages/dask/base.py", line 167, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/work/envs/ddask/lib/python3.6/site-packages/dask/base.py", line 452, in compute
results = schedule(dsk, keys, **kwargs)
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/client.py", line 2725, in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/client.py", line 1992, in gather
asynchronous=asynchronous,
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/client.py", line 833, in sync
self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/utils.py", line 340, in sync
raise exc.with_traceback(tb)
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/utils.py", line 324, in f
result[0] = yield future
File "/work/envs/ddask/lib/python3.6/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/client.py", line 1851, in _gather
raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ("('getitem-8e1d85b31db8991ce62dda67e46d9fd3', 5)", <Worker 'tcp://10.50.35.136:33745', name: SLURMCluster-1, memory: 0, processing: 2188>)
distributed.core - ERROR - 'tcp://10.50.38.59:36950'
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
handler(**merge(extra, msg))
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.50.38.59:36950'
distributed.utils - ERROR - 'tcp://10.50.38.59:36950'
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/utils.py", line 655, in log_errors
yield
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
await self.handle_worker(comm=comm, worker=address)
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
await self.handle_stream(comm=comm, extra={"worker": worker})
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
handler(**merge(extra, msg))
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.50.38.59:36950'
distributed.core - ERROR - 'tcp://10.50.39.55:36267'
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
handler(**merge(extra, msg))
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.50.39.55:36267'
distributed.utils - ERROR - 'tcp://10.50.39.55:36267'
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/utils.py", line 655, in log_errors
yield
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
await self.handle_worker(comm=comm, worker=address)
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
await self.handle_stream(comm=comm, extra={"worker": worker})
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
handler(**merge(extra, msg))
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.50.39.55:36267'
distributed.core - ERROR - 'tcp://10.50.38.59:36950'
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 528, in handle_comm
result = await result
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
await self.handle_worker(comm=comm, worker=address)
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
await self.handle_stream(comm=comm, extra={"worker": worker})
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
handler(**merge(extra, msg))
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.50.38.59:36950'
distributed.core - ERROR - 'tcp://10.50.39.55:36267'
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 528, in handle_comm
result = await result
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
await self.handle_worker(comm=comm, worker=address)
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
await self.handle_stream(comm=comm, extra={"worker": worker})
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
handler(**merge(extra, msg))
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.50.39.55:36267'
distributed.core - ERROR - 'tcp://10.50.35.137:42630'
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
handler(**merge(extra, msg))
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.50.35.137:42630'
distributed.utils - ERROR - 'tcp://10.50.35.137:42630'
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/utils.py", line 655, in log_errors
yield
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
await self.handle_worker(comm=comm, worker=address)
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
await self.handle_stream(comm=comm, extra={"worker": worker})
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
handler(**merge(extra, msg))
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.50.35.137:42630'
distributed.core - ERROR - 'tcp://10.50.35.135:45181'
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
handler(**merge(extra, msg))
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.50.35.135:45181'
distributed.utils - ERROR - 'tcp://10.50.35.135:45181'
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/utils.py", line 655, in log_errors
yield
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
await self.handle_worker(comm=comm, worker=address)
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
await self.handle_stream(comm=comm, extra={"worker": worker})
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
handler(**merge(extra, msg))
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.50.35.135:45181'
distributed.core - ERROR - 'tcp://10.50.35.137:42630'
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 528, in handle_comm
result = await result
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
await self.handle_worker(comm=comm, worker=address)
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
await self.handle_stream(comm=comm, extra={"worker": worker})
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
handler(**merge(extra, msg))
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.50.35.137:42630'
distributed.core - ERROR - 'tcp://10.50.35.135:45181'
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 528, in handle_comm
result = await result
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 1813, in add_worker
await self.handle_worker(comm=comm, worker=address)
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2770, in handle_worker
await self.handle_stream(comm=comm, extra={"worker": worker})
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 597, in handle_stream
handler(**merge(extra, msg))
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/scheduler.py", line 2684, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.50.35.135:45181'
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/comm/core.py", line 288, in connect
timeout=min(intermediate_cap, time_left()),
File "/work/envs/ddask/lib/python3.6/asyncio/tasks.py", line 362, in wait_for
raise futures.TimeoutError()
concurrent.futures._base.TimeoutError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/deploy/spec.py", line 641, in close_clusters
cluster.close(timeout=10)
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/deploy/cluster.py", line 104, in close
return self.sync(self._close, callback_timeout=timeout)
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/deploy/cluster.py", line 183, in sync
return sync(self.loop, func, *args, **kwargs)
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/utils.py", line 340, in sync
raise exc.with_traceback(tb)
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/utils.py", line 324, in f
result[0] = yield future
File "/work/envs/ddask/lib/python3.6/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/deploy/spec.py", line 407, in _close
await self.scheduler_comm.close(close_workers=True)
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 810, in send_recv_from_rpc
comm = await self.live_comm()
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/core.py", line 772, in live_comm
**self.connection_args,
File "/work/envs/ddask/lib/python3.6/site-packages/distributed/comm/core.py", line 310, in connect
) from active_exception
OSError: Timed out trying to connect to tcp://10.50.32.38:38193 after 10 s
Error log of one of the killed workers:
distributed.nanny - INFO - Start Nanny at: 'tcp://10.50.35.139:46695'
distributed.worker - INFO - Start worker at: tcp://10.50.35.139:39474
distributed.worker - INFO - Listening to: tcp://10.50.35.139:39474
distributed.worker - INFO - dashboard at: 10.50.35.139:46304
distributed.worker - INFO - Waiting to connect to: tcp://10.50.32.38:38193
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 40.00 GB
distributed.worker - INFO - Local Directory: /scratch/b/x/dask-worker-space/worker-r7sc67zm
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://10.50.32.38:38193
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Closing worker gracefully: tcp://10.50.35.139:39474
distributed.worker - INFO - Comm closed
distributed.worker - ERROR - failed during get data with tcp://10.50.35.139:39474 -> tcp://10.50.35.199:37618
Traceback (most recent call last):
File "/work/xenvs/ddask/lib/python3.6/site-packages/tornado/iostream.py", line 971, in _handle_write
num_bytes = self.write_to_fd(self._write_buffer.peek(size))
File "/work/xenvs/ddask/lib/python3.6/site-packages/tornado/iostream.py", line 1148, in write_to_fd
return self.socket.send(data) # type: ignore
BrokenPipeError: [Errno 32] Broken pipe
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/work/xenvs/ddask/lib/python3.6/site-packages/distributed/worker.py", line 1281, in get_data
compressed = await comm.write(msg, serializers=serializers)
File "/work/xenvs/ddask/lib/python3.6/site-packages/distributed/comm/tcp.py", line 257, in write
convert_stream_closed_error(self, e)
File "/work/xenvs/ddask/lib/python3.6/site-packages/distributed/comm/tcp.py", line 124, in convert_stream_closed_error
) from exc
distributed.comm.core.CommClosedError: in <closed TCP>: BrokenPipeError: [Errno 32] Broken pipe
distributed.core - INFO - Lost connection to 'tcp://10.50.35.199:39418': in <closed TCP>: BrokenPipeError: [Errno 32] Broken pipe
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x2ab39158cda0>>, <Task finished coro=<Worker.close_gracefully() done, defined at /work/xenvs/ddask/lib/python3.6/site-packages/distributed/worker.py:1179> exception=CommClosedError('in <closed TCP>: Stream is closed',)>)
Traceback (most recent call last):
File "/work/xenvs/ddask/lib/python3.6/site-packages/distributed/comm/tcp.py", line 187, in read
n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/work/xenvs/ddask/lib/python3.6/site-packages/tornado/ioloop.py", line 741, in _run_callback
ret = callback()
File "/work/xenvs/ddask/lib/python3.6/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
future.result()
File "/work/xenvs/ddask/lib/python3.6/site-packages/distributed/worker.py", line 1193, in close_gracefully
await self.scheduler.retire_workers(workers=[self.address], remove=False)
File "/work/xenvs/ddask/lib/python3.6/site-packages/distributed/core.py", line 883, in send_recv_from_rpc
result = await send_recv(comm=comm, op=key, **kwargs)
File "/work/xenvs/ddask/lib/python3.6/site-packages/distributed/core.py", line 666, in send_recv
response = await comm.read(deserializers=deserializers)
File "/work/xenvs/ddask/lib/python3.6/site-packages/distributed/comm/tcp.py", line 202, in read
convert_stream_closed_error(self, e)
File "/work/xenvs/ddask/lib/python3.6/site-packages/distributed/comm/tcp.py", line 126, in convert_stream_closed_error
raise CommClosedError("in %s: %s" % (obj, exc)) from exc
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed
slurmstepd: error: *** JOB 32249611 ON m CANCELLED AT 2021-09-09T10:46:05 DUE TO TIME LIMIT ***
distributed.dask_worker - INFO - Exiting on signal 15
distributed.nanny - INFO - Closing Nanny at 'tcp://10.50.35.139:46695'
distributed.nanny - INFO - Worker process 37778 was killed by signal 15
distributed.dask_worker - INFO - End worker
What you expected to happen: I expect the workers reaching the specified lifetime getting shut down in a matter of seconds and then getting restarted, no matter what my dask code does, really. Minimal Complete Verifiable Example:
#most of the code is from some other issue on here
import numpy as np
from dask_jobqueue import SLURMCluster as Cluster
import time
import dask.array as da
from dask.distributed import Client, as_completed
cluster = Cluster(walltime='00:02:00',
interface="ib0",
cores=1, memory='40gb',extra=["--lifetime", "50s",
"--lifetime-stagger", "5s"])
cluster.adapt(minimum=4, maximum=10)
client = Client(cluster)
# each job takes 1s, and we have 4 cpus * 1 min * 60s/min = 240 cpu.s, let's ask for a little more tasks.
filenames = [f'img{num}.jpg' for num in range(48000)]
def features(num_fn):
num, image_fn = num_fn
time.sleep(1) # takes about 1s to compute features on an image
features = np.random.random(24600)
return num, features
num_files = len(filenames)
num_features = len(features((0, filenames[0]))[1])
X = np.zeros((num_files, num_features), dtype=np.float32)
Y=da.random.random(X.shape)
Y=Y.persist() #comment out this line and everything works as expected
for future in as_completed(client.map(features, list(enumerate(filenames)))):
i, v = future.result()
X[i, :] = v+Y[i]
Anything else we need to know?: I contacted HPC IT and they are not aware of any tcp issues/requirements Environment:
- Dask version: dask 2.25.0 py_0 conda-forge dask-core 2.25.0 py_0 conda-forge dask-jobqueue 0.7.3 pyhd8ed1ab_0 conda-forge distributed 2.30.1 py36h06a4308_0
- Python version: 3.6.0
- Operating System: RHEL 6.10
- Install method (conda, pip, source): conda 4.10.3
Thanks @Wacken0013 for the detailed issue, and sorry not to get back at you earlier.
I did not have the time to try your example yet to figure out what may be going wrong.
At first glance it looks like your worker fails to send the data it is owning before shutting down, which prevents the process to stop. And I confirm this is not the expected behavior.
First of all, I see the versions of Dask you're using (and Python version) are getting quite old, is it possible for you to install a more up to date version and see if you reproduce the problem?
Also not related, but I see you seem to be using a shared file system for your workers local directory (/scratch/b/x/dask-worker-space/worker-r7sc67zm), which is not recommended. Could you use something like /tmp or '$TMPDIR' if available?
Thanks for the reply @guillaumeeb. I probably wont be able to upgrade the versions significantly, due to the old OS. Can you explain why using scratch is an issue? I can definitely change that, maybe it is also the source of my other dask problems :D
As @guillaumeeb mentioned that's a reasonably old Dask version and we no longer support Python 3.6.
However I am wondering if this is a race issue between creating new workers, transferring state and the walltime.
I see your walltime is two minutes and lifetime is 50 seconds. That leaves 70 seconds to schedule a new worker job, have it fulfilled, connect to the scheduler and migrate the state.
Are you seeing new jobs being created to replace the old ones? And are you seeing them connect to the scheduler in the scheduler log?
Can you explain why using scratch is an issue?
In my experience (but I've to admit I'm not really sure of the use of each worker local directory, and this may have evolved across time), specifying a shared directory as the base directory of workers local directory can lead to bad behavior of workers. Workers compete for writing in this shared directory and sometimes fail.
I probably wont be able to upgrade the versions significantly, due to the old OS.
Yep, I did notice the old OS versions, but I didn't think it would prevent using something like Conda to upgrade to more up to date versions.
I see your walltime is two minutes and lifetime is 50 seconds. That leaves 70 seconds to schedule a new worker job, have it fulfilled, connect to the scheduler and migrate the state.
But @jacobtomlinson do we agree that this is not normal the worker stays kind of alive for these 70 seconds? Ideally, it should migrate the state to other workers currently in the cluster? I really have to try the example...
The worker local directory should be placed on the fastest temporary storage possible. Shared storage is really not a good place for this.
Without digging deeper into the example I think it should be migrating to other workers, but perhaps due to memory, network or storage limitations this is taking longer than 70 seconds?
Yep, I did notice the old OS versions, but I didn't think it would prevent using something like Conda to upgrade to more up to date versions.
No, its a big issue on the machine. I got an environment based on python 3.7 working now but I have not had the time to test this specific issue again.
Are you seeing new jobs being created to replace the old ones? And are you seeing them connect to the scheduler in the scheduler log?
I have not checked the logs, but I didnt see new jobs being created in slurm if I recall correctly. 70s should be enough by alot. I did try with hours too, didnt change anything. And really the question is why the migration should take prohibitively longer because of persist. I'll run the MVE again on the new env soon and update here
And really the question is why the migration should take prohibitively longer because of persist.
The migration will take longer because the worker has all the data persisted in memory and it needs to send it to other workers. But I agree it shouldn't be prohibitively longer.
I tried the MVE again in the new environment. Workers still get killed. However I do see the creation of new workers when the old ones reach the 40s lifetime so I probably just remembered wrong. But the old ones last until the slurm lifetime and get killed. The new environment uses dask 2021.4.1 pyhd8ed1ab_0 conda-forge dask-core 2021.4.1 pyhd8ed1ab_0 conda-forge dask-jobqueue 0.7.3 pyhd8ed1ab_0 conda-forge
Stupid question: I also had a lot of trouble with adapt, so I would love to add a (failing) test to the github CI to document this. Can the minimal example (with pytest.mark.xfail
) be rewritten without using numpy and made a PR (because this would require extra dependencies) using normal arrays? Would be great to have a test that requires migration.
some info, I used
walltime: '00:30:00'
extra: ["--lifetime", "25m", "--lifetime-stagger", "1m"],
in SLURMCluster
to avoid workers being killed at walltime
Hi @arndt-k,
I finally tried your MCVE, with 6 months old Dask versions:
dask 2022.2.0
dask-jobqueue 0.7.3
distributed 2022.2.0
python 3.9.104
On a RHEL 7 system with PBS.
I tried it several times, and never get an error. The adaptive mechanism works as expected.
I tried with only the Y.persist()
, to check that the in memory data was well sent back and forth to new workers, and then with the client.map
and as_completed
part. Workflow was running smoothly, even if it did advance slowly, especially with the really large filenames range (48000!!). At one point, (2000 filenames processed), I just cancel it to avoid saturating the PBS Server with new jobs every minutes.
I relaunched it with 800 filenames, and also modified the Cluster constructor kwargs (more cores, less memory per worker), and it finished without a problem, after stopping and starting 3 workers 'batch'.
Here is the final version of the code:
import numpy as np
from dask_jobqueue import PBSCluster as Cluster
import time
import dask.array as da
from dask.distributed import Client, as_completed
cluster = Cluster(walltime='00:02:00',
interface="ib0",
cores=4, memory='20GiB',extra=["--lifetime", "50s",
"--lifetime-stagger", "5s"])
cluster.adapt(minimum=4, maximum=10)
client = Client(cluster)
# each job takes 1s, and we have 4 cpus * 1 min * 60s/min = 240 cpu.s, let's ask for a little more tasks.
filenames = [f'img{num}.jpg' for num in range(600)]
def features(num_fn):
num, image_fn = num_fn
time.sleep(1) # takes about 1s to compute features on an image
features = np.random.random(24600)
return num, features
num_files = len(filenames)
num_features = len(features((0, filenames[0]))[1])
X = np.zeros((num_files, num_features), dtype=np.float32)
Y=da.random.random(X.shape)
Y=Y.persist()
for future in as_completed(client.map(features, list(enumerate(filenames)))):
i, v = future.result()
X[i, :] = v+Y[i]
So I'm going to close this issue as the adaptive logic seems to be working in your case with a more recent Dask, Python and OS version. Feel free to add more feedback here and even reopen if you find it does not.