Set interface in `SLURMRunner`
Hello from a new user! I'm putting this here rather than opening a new issue, but let me know if I should do the latter instead.
Following the documentation, I am trying to run my very first "hello dask" script that looks like the following:
from dask.distributed import Client from dask_jobqueue.slurm import SLURMRunner with SLURMRunner() as runner: with Client(runner) as client: client.wait_for_workers(runner.n_workers) print(f"Number of workers = {runner.n_workers}")When I submit the job using slurm, I get the following network-related warning
2025-02-12 16:22:11,565 - distributed.scheduler - INFO - State start /home/sm69/.conda/envs/pyathena/lib/python3.13/site-packages/distributed/utils.py:189: RuntimeWarning: Couldn't detect a suitable IP address for reaching '8.8.8.8', defaulting to hostname: [Errno 101] Network is unreachable warnings.warn( 2025-02-12 16:22:11,569 - distributed.scheduler - INFO - Scheduler at: tcp://10.33.81.152:35737 2025-02-12 16:22:11,569 - distributed.scheduler - INFO - dashboard at: http://10.33.81.152:8787/status 2025-02-12 16:22:11,569 - distributed.scheduler - INFO - Registering Worker plugin shuffle 2025-02-12 16:22:11,647 - distributed.scheduler - INFO - Receive client connection: Client-6c2bbb5b-e987-11ef-b579-78ac4413ab38 2025-02-12 16:22:11,647 - distributed.core - INFO - Starting established connection to tcp://10.33.81.152:58686 2025-02-12 16:22:11,658 - distributed.worker - INFO - Start worker at: tcp://10.33.81.152:42115 2025-02-12 16:22:11,658 - distributed.worker - INFO - Listening to: tcp://10.33.81.152:42115 2025-02-12 16:22:11,658 - distributed.worker - INFO - Start worker at: tcp://10.33.81.152:38967 2025-02-12 16:22:11,658 - distributed.worker - INFO - Start worker at: tcp://10.33.81.152:44313 2025-02-12 16:22:11,658 - distributed.worker - INFO - Start worker at: tcp://10.33.81.152:42309 2025-02-12 16:22:11,658 - distributed.worker - INFO - Worker name: 9 2025-02-12 16:22:11,659 - distributed.worker - INFO - dashboard at: 10.33.81.152:46699 2025-02-12 16:22:11,659 - distributed.worker - INFO - Waiting to connect to: tcp://10.33.81.152:35737 2025-02-12 16:22:11,659 - distributed.worker - INFO - Start worker at: tcp://10.33.81.152:34517 ...Followed by
StreamClosedErrorandCommClosedErrorBefore get into the Runner, I have already tried using Cluster, by, e.g.,
ncores = 96 SLURMCluster(cores=ncores, memory='720 GiB', processes=ncores, interface="ib0")As you can see here, I had to set
interface="ib0"(the cluster uses infiniband for inter-node communication); otherwise I got similar error.This made me think that I have to do something similar to
interface="ib0"when usingSLURMRunneras well, but I couldn't find such thing in the documentation. Could you guide me what to do?Somewhat related feedback from a new user's perspective: It was a surprise to me when I first realize
SLURMClusterdoes not support multi-node job. I was not mentioned explicitly in the documentation, and I had to surf through several issues to come to realize that is the case. I think one of the main motivation to use dask is to overcome single node memory bound when analyzing large simulation data, so I naively assumed thatdask-jobqueuewould support multi-node job. It might be very helpful that documentation explicitly says thatSLURMClustercannot submit multi-node job.
Originally posted by @sanghyukmoon in #638
You should be able to configure the interface today like this
from dask.distributed import Client
from dask_jobqueue.slurm import SLURMRunner
with SLURMRunner(scheduler_options={"interface": "ib0"}, worker_options={"interface": "ib0"}) as runner:
with Client(runner) as client:
client.wait_for_workers(runner.n_workers)
print(f"Number of workers = {runner.n_workers}")
I agree it might be nice to set a top-level interface= keyword that we pass along to both options under the hood. We do this already for scheduler_file=.
https://github.com/dask/dask-jobqueue/blob/e67631da9a6a7c8e133b533828d71fb2a87db363/dask_jobqueue/slurm.py#L206
https://github.com/dask/dask-jobqueue/blob/e67631da9a6a7c8e133b533828d71fb2a87db363/dask_jobqueue/slurm.py#L228-L235
One way we could implement this would be to capture **kwargs in the BaseRunner.__init__() and update those into the scheduler_options and worker_options.
https://github.com/dask/dask-jobqueue/blob/e67631da9a6a7c8e133b533828d71fb2a87db363/dask_jobqueue/runner.py#L61-L70
That way you could pass abritrary kwargs to SLURMRunner as long as they both exist in the Scheduler and Worker classes and they would get passed all the way down.
You should be able to configure the interface today like this
from dask.distributed import Client from dask_jobqueue.slurm import SLURMRunner with SLURMRunner(scheduler_options={"interface": "ib0"}, worker_options={"interface": "ib0"}) as runner: with Client(runner) as client: client.wait_for_workers(runner.n_workers) print(f"Number of workers = {runner.n_workers}")
This indeed worked, thanks!
Is there a use case where scheduler and worker processes rely on different network interfaces? Because otherwise, one may simply have a single interface= type argument as you suggested.
Sometimes you might want the client <> scheduler communication to be on a different interface then worker <> scheduler communication. Being able to set scheduler_options and worker_options gives a lot of flexibility when configuring clusters.
However I think 99% of the time if you are setting the interface to ib0 you want to do it everywhere. So making a single interface= kwarg too would make this more pleasant for most users.