dask-jobqueue
dask-jobqueue copied to clipboard
Handshake timeout error on SGE cluster
from dask_jobqueue import SGECluster
from dask.distributed import Client, as_completed
import blm_setup, blm_batch, blm_concat, blm_cleanup # my own functions which don't employ dask
# Given a dask distributed client run BLM.
def main(cluster, client):
# Some generic inputs
inputs= ... # Function inputs
# Ask for a node for setup
cluster.scale(1)
# Setup does some computation to work out how many jobs to submit
future_0 = client.submit(blm_setup, inputs, pure=False)
nb = future_0.result()
# Ask for 100 workers for next stage of computation
cluster.scale(100)
# Empty futures list
futures = []
# Run jobs
for b in (np.arange(nb)+1):
# Individual job
future_b = client.submit(blm_batch, b, inputs, pure=False)
# Append to list
futures.append(future_b)
# Completed jobs
completed = as_completed(futures)
# Wait for results
for future_b in completed:
future_b.result()
# Ask for 1 node for final stage
cluster.scale(1)
# Concatenation job
future_concat = client.submit(blm_concat, inputs, pure=False)
# Run concatenation job
future_concat.result()
# Run cleanup job
future_cleanup = client.submit(blm_cleanup, inputs_yml, pure=False)
# Run cleanup job
future_cleanup.result()
# If running this function
if __name__ == "__main__":
# Specify cluster setup
cluster = SGECluster(cores=1,
memory="100GB",
queue='short.qc',
walltime='00:30:00',
extra=['--no-dashboard'],
interface="ib0",
local_directory='FILEPATH') # I have removed the filepath from the original code here.
# Connect to cluster
client = Client(cluster)
# Run code
main(cluster, client)
# Close the client
client.close()
(Source: https://github.com/TomMaullin/BLM/tree/dask_update)
What happened: I ran the above code (with the FILEPATH
replaced appropriately) on the head node of an SGE cluster using python blm_cluster.py &
(where blm_cluster
contains the above code). First, it gave me 1 worker and the setup computation seemed to run correctly, then it gave me 100 workers and they seemed to run correctly. Finally it went back down to 1 worker. However, at this stage this error appeared in the log for the one remaining worker:
distributed.nanny - INFO - Start Nanny at: 'tcp://10.128.10.26:38442'
distributed.worker - INFO - Start worker at: tcp://10.128.10.26:34295
distributed.worker - INFO - Listening to: tcp://10.128.10.26:34295
distributed.worker - INFO - dashboard at: 10.128.10.26:38150
distributed.worker - INFO - Waiting to connect to: tcp://10.128.1.241:34386
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 93.13 GiB
distributed.worker - INFO - Local Directory: /well/nichols/users/inf852/BLMdask/dask-worker-space/worker-b_q5vip7
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://10.128.1.241:34386
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x2b87ac5f4450>>, <Task finished coro=$
Traceback (most recent call last):
File "/users/nichols/inf852/miniconda3/envs/BLM/lib/python3.7/site-packages/distributed/comm/core.py", line 326, in connect
handshake = await asyncio.wait_for(comm.read(), time_left())
File "/users/nichols/inf852/miniconda3/envs/BLM/lib/python3.7/asyncio/tasks.py", line 449, in wait_for
raise futures.TimeoutError()
concurrent.futures._base.TimeoutError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/users/nichols/inf852/miniconda3/envs/BLM/lib/python3.7/site-packages/tornado/ioloop.py", line 741, in _run_callback
ret = callback()
File "/users/nichols/inf852/miniconda3/envs/BLM/lib/python3.7/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
future.result()
File "/users/nichols/inf852/miniconda3/envs/BLM/lib/python3.7/site-packages/distributed/worker.py", line 1370, in heartbeat
raise e
File "/users/nichols/inf852/miniconda3/envs/BLM/lib/python3.7/site-packages/distributed/worker.py", line 1336, in heartbeat
for key in self.active_keys
File "/users/nichols/inf852/miniconda3/envs/BLM/lib/python3.7/site-packages/distributed/utils_comm.py", line 390, in retry_operation
operation=operation,
File "/users/nichols/inf852/miniconda3/envs/BLM/lib/python3.7/site-packages/distributed/utils_comm.py", line 370, in retry
return await coro()
File "/users/nichols/inf852/miniconda3/envs/BLM/lib/python3.7/site-packages/distributed/core.py", line 902, in send_recv_from_rpc
comm = await self.pool.connect(self.addr)
File "/users/nichols/inf852/miniconda3/envs/BLM/lib/python3.7/site-packages/distributed/core.py", line 1072, in connect
comm = await fut
File "/users/nichols/inf852/miniconda3/envs/BLM/lib/python3.7/site-packages/distributed/comm/core.py", line 333, in connect
) from exc
OSError: Timed out during handshake while connecting to tcp://10.128.1.241:34386 after 30 s
From this point onwards the final worker doesn't seem to terminate or be doing anything. It says it is running when I do qstat
but it doesn't seem to be doing anything.
What you expected to happen: I expected the first worker to run my blm_setup
code. Then 100 workers to run my blm_batch
code. Then finally one worker should run my blm_concat
code which combines files created by the blm_batch
code.
Minimal Complete Verifiable Example: I'm not sure how to reduce the above any further.
Environment:
- Dask version:
dask==2022.2.0
,dask-jobqueue==0.7.3
- Python version: 3.7.11
- Operating System: CentOS Linux 7
- Install method (conda, pip, source): pip inside a conda environement
I am facing the same problems. Also, it only occurs on some nodes (~50%) of my cluster, and others seem to be working fine. I can submit jobs fine with qsub
to these same nodes so I know it is not a network issue.
It's hard to say what the issue might be from this
Sometimes there are restrictions in an institution's clusters. For example some ports might be disallowed. There could be constraints placed on processes themselves
Given this is around using Dask-Jobqueue and SGE, thinking there might be better advice from folks on that repo/issue tracker, so going to move this over there
OK, first with @TomMaullin problem:
It is a bit weird that everything is OK with one hundred workers, and it fails when falling back to only one. Are you sure the error is appearing only after the scale down is called? If yes, this might be a problem of future objects still on distributed memory (and rebalancing)? Or maybe the last function call is trying to get some big data to or from the Scheduler, you should try to follow with the Dashboard.
Some other considerations/suggestions:
- Do you really need to scale up and down? The pre and post processing functions are long enough for this?
- You could also execute these functions on the same process as the Client? And just run with Dask when it's distributed?
- Else, I would have use delayed in order to generate this workflow with Dask, but you won't be able to scale manually between steps then.
- I'm not sure what you're doing with your
future_b.result()
? - local_directory='FILEPATH': this is weird. local_directory should be a temp directory for dask storing things. You should ideally give a compute node local directory (/tmp, or '$TMPDIR').
@desh2608:
Your problem seems much more general: if I understand correctly, on your cluster, Dask workers fail with half of the nodes?
If so, this is either a networking problem, either a Scheduler overloaded problem (how many Dask workers are you launching at the same time? Did you try to increase timeouts?). Do manually submitting a job launching a Dask worker which connects to your Scheduler work on these nodes? Are those always the same nodes that causes issues?
@desh2608:
Your problem seems much more general: if I understand correctly, on your cluster, Dask workers fail with half of the nodes?
If so, this is either a networking problem, either a Scheduler overloaded problem (how many Dask workers are you launching at the same time? Did you try to increase timeouts?). Do manually submitting a job launching a Dask worker which connects to your Scheduler work on these nodes? Are those always the same nodes that causes issues?
I started with trying to use 60 workers, but most of them (~75%) were failing. On reducing to 20 workers, I could get most of them to run. I also tried increasing the timeout to 300s but it helped only slightly. And it's not always the same nodes which cause the nanny timeout, so I think it is not a node issue.
Manually submitting jobs with qsub works fine, even for as many as 80 jobs.
@desh2608:
- First, on the workers that are failing, do you see the same error message as @TomMaullin?
- On the Scheduler logs, what are you seeing?
- This might mean that your workers are slow to start, two points to look at:
- On which file system is you Python environment installed? Is it fast?
- Do you use the local-directory kwarg to put Workers' local working dir to a local storage like /tmp?
@desh2608 @TomMaullin do you still face your issues? Without more inputs from you in the following weeks, I'm going to close this issue.
Sorry, I switched to using qsub instead since I had deadlines to meet. As far as I can recall, I was seeing the same error as reported by @TomMaullin. You can close this issue for now. I will reopen it later with details if I face the problem again on a different project. Thanks for your help!
I'm closing the issue since there is no answer from @TomMaullin. Feel free to reopen if you have new experiments to add.
Apologies, I must have missed this response; I am no longer facing this issue as our system changed from SGE to SLURM.