dask-jobqueue icon indicating copy to clipboard operation
dask-jobqueue copied to clipboard

Handshake timeout error on SGE cluster

Open TomMaullin opened this issue 2 years ago • 7 comments

from dask_jobqueue import SGECluster
from dask.distributed import Client, as_completed
import blm_setup, blm_batch, blm_concat, blm_cleanup # my own functions which don't employ dask

# Given a dask distributed client run BLM.
def main(cluster, client):

    # Some generic inputs
    inputs= ... # Function inputs

    # Ask for a node for setup
    cluster.scale(1)

    # Setup does some computation to work out how many jobs to submit
    future_0 = client.submit(blm_setup, inputs, pure=False)
    nb = future_0.result()

    # Ask for 100 workers for next stage of computation
    cluster.scale(100)

    # Empty futures list
    futures = []

    # Run jobs
    for b in (np.arange(nb)+1):

        # Individual job
        future_b = client.submit(blm_batch, b, inputs, pure=False)
        
        # Append to list
        futures.append(future_b)

    # Completed jobs
    completed = as_completed(futures)

    # Wait for results
    for future_b in completed:
        future_b.result()

    # Ask for 1 node for final stage
    cluster.scale(1)

    # Concatenation job
    future_concat = client.submit(blm_concat, inputs, pure=False)

    # Run concatenation job
    future_concat.result()

    # Run cleanup job
    future_cleanup = client.submit(blm_cleanup, inputs_yml, pure=False)

    # Run cleanup job
    future_cleanup.result()

# If running this function
if __name__ == "__main__":

    # Specify cluster setup
    cluster = SGECluster(cores=1,
                         memory="100GB",
                         queue='short.qc',
                         walltime='00:30:00',
                         extra=['--no-dashboard'],
                         interface="ib0",
                         local_directory='FILEPATH') # I have removed the filepath from the original code here. 

    # Connect to cluster
    client = Client(cluster)   

    # Run code
    main(cluster, client)

    # Close the client
    client.close()

(Source: https://github.com/TomMaullin/BLM/tree/dask_update)

What happened: I ran the above code (with the FILEPATH replaced appropriately) on the head node of an SGE cluster using python blm_cluster.py & (where blm_cluster contains the above code). First, it gave me 1 worker and the setup computation seemed to run correctly, then it gave me 100 workers and they seemed to run correctly. Finally it went back down to 1 worker. However, at this stage this error appeared in the log for the one remaining worker:

distributed.nanny - INFO -         Start Nanny at: 'tcp://10.128.10.26:38442'
distributed.worker - INFO -	  Start worker at:   tcp://10.128.10.26:34295
distributed.worker - INFO -          Listening to:   tcp://10.128.10.26:34295
distributed.worker - INFO -          dashboard at:         10.128.10.26:38150
distributed.worker - INFO - Waiting to connect to:   tcp://10.128.1.241:34386
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                  93.13 GiB
distributed.worker - INFO -	  Local Directory: /well/nichols/users/inf852/BLMdask/dask-worker-space/worker-b_q5vip7
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:   tcp://10.128.1.241:34386
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x2b87ac5f4450>>, <Task finished coro=$
Traceback (most recent call last):
  File "/users/nichols/inf852/miniconda3/envs/BLM/lib/python3.7/site-packages/distributed/comm/core.py", line 326, in connect
    handshake = await asyncio.wait_for(comm.read(), time_left())
  File "/users/nichols/inf852/miniconda3/envs/BLM/lib/python3.7/asyncio/tasks.py", line 449, in wait_for
    raise futures.TimeoutError()
concurrent.futures._base.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/users/nichols/inf852/miniconda3/envs/BLM/lib/python3.7/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/users/nichols/inf852/miniconda3/envs/BLM/lib/python3.7/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/users/nichols/inf852/miniconda3/envs/BLM/lib/python3.7/site-packages/distributed/worker.py", line 1370, in heartbeat
    raise e
  File "/users/nichols/inf852/miniconda3/envs/BLM/lib/python3.7/site-packages/distributed/worker.py", line 1336, in heartbeat
    for key in self.active_keys
  File "/users/nichols/inf852/miniconda3/envs/BLM/lib/python3.7/site-packages/distributed/utils_comm.py", line 390, in retry_operation
    operation=operation,
  File "/users/nichols/inf852/miniconda3/envs/BLM/lib/python3.7/site-packages/distributed/utils_comm.py", line 370, in retry
    return await coro()
  File "/users/nichols/inf852/miniconda3/envs/BLM/lib/python3.7/site-packages/distributed/core.py", line 902, in send_recv_from_rpc
    comm = await self.pool.connect(self.addr)
  File "/users/nichols/inf852/miniconda3/envs/BLM/lib/python3.7/site-packages/distributed/core.py", line 1072, in connect
    comm = await fut
  File "/users/nichols/inf852/miniconda3/envs/BLM/lib/python3.7/site-packages/distributed/comm/core.py", line 333, in connect
    ) from exc
OSError: Timed out during handshake while connecting to tcp://10.128.1.241:34386 after 30 s

From this point onwards the final worker doesn't seem to terminate or be doing anything. It says it is running when I do qstat but it doesn't seem to be doing anything.

What you expected to happen: I expected the first worker to run my blm_setup code. Then 100 workers to run my blm_batch code. Then finally one worker should run my blm_concat code which combines files created by the blm_batch code.

Minimal Complete Verifiable Example: I'm not sure how to reduce the above any further.

Environment:

  • Dask version: dask==2022.2.0, dask-jobqueue==0.7.3
  • Python version: 3.7.11
  • Operating System: CentOS Linux 7
  • Install method (conda, pip, source): pip inside a conda environement

TomMaullin avatar Feb 21 '22 19:02 TomMaullin

I am facing the same problems. Also, it only occurs on some nodes (~50%) of my cluster, and others seem to be working fine. I can submit jobs fine with qsub to these same nodes so I know it is not a network issue.

desh2608 avatar Mar 21 '22 15:03 desh2608

It's hard to say what the issue might be from this

Sometimes there are restrictions in an institution's clusters. For example some ports might be disallowed. There could be constraints placed on processes themselves

Given this is around using Dask-Jobqueue and SGE, thinking there might be better advice from folks on that repo/issue tracker, so going to move this over there

jakirkham avatar Apr 09 '22 02:04 jakirkham

OK, first with @TomMaullin problem:

It is a bit weird that everything is OK with one hundred workers, and it fails when falling back to only one. Are you sure the error is appearing only after the scale down is called? If yes, this might be a problem of future objects still on distributed memory (and rebalancing)? Or maybe the last function call is trying to get some big data to or from the Scheduler, you should try to follow with the Dashboard.

Some other considerations/suggestions:

  • Do you really need to scale up and down? The pre and post processing functions are long enough for this?
  • You could also execute these functions on the same process as the Client? And just run with Dask when it's distributed?
  • Else, I would have use delayed in order to generate this workflow with Dask, but you won't be able to scale manually between steps then.
  • I'm not sure what you're doing with your future_b.result() ?
  • local_directory='FILEPATH': this is weird. local_directory should be a temp directory for dask storing things. You should ideally give a compute node local directory (/tmp, or '$TMPDIR').

@desh2608:

Your problem seems much more general: if I understand correctly, on your cluster, Dask workers fail with half of the nodes?

If so, this is either a networking problem, either a Scheduler overloaded problem (how many Dask workers are you launching at the same time? Did you try to increase timeouts?). Do manually submitting a job launching a Dask worker which connects to your Scheduler work on these nodes? Are those always the same nodes that causes issues?

guillaumeeb avatar Apr 11 '22 12:04 guillaumeeb

@desh2608:

Your problem seems much more general: if I understand correctly, on your cluster, Dask workers fail with half of the nodes?

If so, this is either a networking problem, either a Scheduler overloaded problem (how many Dask workers are you launching at the same time? Did you try to increase timeouts?). Do manually submitting a job launching a Dask worker which connects to your Scheduler work on these nodes? Are those always the same nodes that causes issues?

I started with trying to use 60 workers, but most of them (~75%) were failing. On reducing to 20 workers, I could get most of them to run. I also tried increasing the timeout to 300s but it helped only slightly. And it's not always the same nodes which cause the nanny timeout, so I think it is not a node issue.

Manually submitting jobs with qsub works fine, even for as many as 80 jobs.

desh2608 avatar Apr 11 '22 13:04 desh2608

@desh2608:

  • First, on the workers that are failing, do you see the same error message as @TomMaullin?
  • On the Scheduler logs, what are you seeing?
  • This might mean that your workers are slow to start, two points to look at:
    • On which file system is you Python environment installed? Is it fast?
    • Do you use the local-directory kwarg to put Workers' local working dir to a local storage like /tmp?

guillaumeeb avatar Apr 14 '22 13:04 guillaumeeb

@desh2608 @TomMaullin do you still face your issues? Without more inputs from you in the following weeks, I'm going to close this issue.

guillaumeeb avatar Aug 14 '22 06:08 guillaumeeb

Sorry, I switched to using qsub instead since I had deadlines to meet. As far as I can recall, I was seeing the same error as reported by @TomMaullin. You can close this issue for now. I will reopen it later with details if I face the problem again on a different project. Thanks for your help!

desh2608 avatar Aug 14 '22 07:08 desh2608

I'm closing the issue since there is no answer from @TomMaullin. Feel free to reopen if you have new experiments to add.

guillaumeeb avatar Aug 30 '22 05:08 guillaumeeb

Apologies, I must have missed this response; I am no longer facing this issue as our system changed from SGE to SLURM.

TomMaullin avatar Apr 08 '24 13:04 TomMaullin