dask-jobqueue icon indicating copy to clipboard operation
dask-jobqueue copied to clipboard

Set interface in `SLURMRunner`

Open jacobtomlinson opened this issue 11 months ago • 4 comments

Hello from a new user! I'm putting this here rather than opening a new issue, but let me know if I should do the latter instead.

Following the documentation, I am trying to run my very first "hello dask" script that looks like the following:

from dask.distributed import Client
from dask_jobqueue.slurm import SLURMRunner

with SLURMRunner() as runner:
    with Client(runner) as client:
        client.wait_for_workers(runner.n_workers)
        print(f"Number of workers = {runner.n_workers}")

When I submit the job using slurm, I get the following network-related warning

2025-02-12 16:22:11,565 - distributed.scheduler - INFO - State start
/home/sm69/.conda/envs/pyathena/lib/python3.13/site-packages/distributed/utils.py:189: RuntimeWarning: Couldn't detect a suitable IP address for reaching '8.8.8.8', defaulting to hostname: [Errno 101] Network is unreachable
  warnings.warn(
2025-02-12 16:22:11,569 - distributed.scheduler - INFO -   Scheduler at:  tcp://10.33.81.152:35737
2025-02-12 16:22:11,569 - distributed.scheduler - INFO -   dashboard at:  http://10.33.81.152:8787/status
2025-02-12 16:22:11,569 - distributed.scheduler - INFO - Registering Worker plugin shuffle
2025-02-12 16:22:11,647 - distributed.scheduler - INFO - Receive client connection: Client-6c2bbb5b-e987-11ef-b579-78ac4413ab38
2025-02-12 16:22:11,647 - distributed.core - INFO - Starting established connection to tcp://10.33.81.152:58686
2025-02-12 16:22:11,658 - distributed.worker - INFO -       Start worker at:   tcp://10.33.81.152:42115
2025-02-12 16:22:11,658 - distributed.worker - INFO -          Listening to:   tcp://10.33.81.152:42115
2025-02-12 16:22:11,658 - distributed.worker - INFO -       Start worker at:   tcp://10.33.81.152:38967
2025-02-12 16:22:11,658 - distributed.worker - INFO -       Start worker at:   tcp://10.33.81.152:44313
2025-02-12 16:22:11,658 - distributed.worker - INFO -       Start worker at:   tcp://10.33.81.152:42309
2025-02-12 16:22:11,658 - distributed.worker - INFO -           Worker name:                          9
2025-02-12 16:22:11,659 - distributed.worker - INFO -          dashboard at:         10.33.81.152:46699
2025-02-12 16:22:11,659 - distributed.worker - INFO - Waiting to connect to:   tcp://10.33.81.152:35737
2025-02-12 16:22:11,659 - distributed.worker - INFO -       Start worker at:   tcp://10.33.81.152:34517
...

Followed by StreamClosedError and CommClosedError

Before get into the Runner, I have already tried using Cluster, by, e.g.,

ncores = 96
SLURMCluster(cores=ncores, memory='720 GiB', processes=ncores, interface="ib0")

As you can see here, I had to set interface="ib0" (the cluster uses infiniband for inter-node communication); otherwise I got similar error.

This made me think that I have to do something similar to interface="ib0" when using SLURMRunner as well, but I couldn't find such thing in the documentation. Could you guide me what to do?

Somewhat related feedback from a new user's perspective: It was a surprise to me when I first realize SLURMCluster does not support multi-node job. I was not mentioned explicitly in the documentation, and I had to surf through several issues to come to realize that is the case. I think one of the main motivation to use dask is to overcome single node memory bound when analyzing large simulation data, so I naively assumed that dask-jobqueue would support multi-node job. It might be very helpful that documentation explicitly says that SLURMCluster cannot submit multi-node job.

Originally posted by @sanghyukmoon in #638

jacobtomlinson avatar Feb 13 '25 11:02 jacobtomlinson

You should be able to configure the interface today like this

from dask.distributed import Client
from dask_jobqueue.slurm import SLURMRunner

with SLURMRunner(scheduler_options={"interface": "ib0"}, worker_options={"interface": "ib0"}) as runner:
    with Client(runner) as client:
        client.wait_for_workers(runner.n_workers)
        print(f"Number of workers = {runner.n_workers}")

I agree it might be nice to set a top-level interface= keyword that we pass along to both options under the hood. We do this already for scheduler_file=.

https://github.com/dask/dask-jobqueue/blob/e67631da9a6a7c8e133b533828d71fb2a87db363/dask_jobqueue/slurm.py#L206

https://github.com/dask/dask-jobqueue/blob/e67631da9a6a7c8e133b533828d71fb2a87db363/dask_jobqueue/slurm.py#L228-L235

jacobtomlinson avatar Feb 13 '25 11:02 jacobtomlinson

One way we could implement this would be to capture **kwargs in the BaseRunner.__init__() and update those into the scheduler_options and worker_options.

https://github.com/dask/dask-jobqueue/blob/e67631da9a6a7c8e133b533828d71fb2a87db363/dask_jobqueue/runner.py#L61-L70

That way you could pass abritrary kwargs to SLURMRunner as long as they both exist in the Scheduler and Worker classes and they would get passed all the way down.

jacobtomlinson avatar Feb 13 '25 11:02 jacobtomlinson

You should be able to configure the interface today like this

from dask.distributed import Client
from dask_jobqueue.slurm import SLURMRunner

with SLURMRunner(scheduler_options={"interface": "ib0"}, worker_options={"interface": "ib0"}) as runner:
    with Client(runner) as client:
        client.wait_for_workers(runner.n_workers)
        print(f"Number of workers = {runner.n_workers}")

This indeed worked, thanks!

Is there a use case where scheduler and worker processes rely on different network interfaces? Because otherwise, one may simply have a single interface= type argument as you suggested.

sanghyukmoon avatar Feb 13 '25 16:02 sanghyukmoon

Sometimes you might want the client <> scheduler communication to be on a different interface then worker <> scheduler communication. Being able to set scheduler_options and worker_options gives a lot of flexibility when configuring clusters.

However I think 99% of the time if you are setting the interface to ib0 you want to do it everywhere. So making a single interface= kwarg too would make this more pleasant for most users.

jacobtomlinson avatar Feb 14 '25 10:02 jacobtomlinson