dask-jobqueue icon indicating copy to clipboard operation
dask-jobqueue copied to clipboard

No workers info available after scale() with SLURMCluster

Open AChatzigoulas opened this issue 6 years ago • 28 comments

I have a very similar issue with #246. After I create the cluster and scale it the nodes are running but no CPUs are available.

import logging
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster()
DEBUG:Using selector: EpollSelector
DEBUG:Using selector: EpollSelector
cluster.scale(jobs = 2)
DEBUG:Starting worker: 1
DEBUG:writing job script: 
#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -p compute
#SBATCH -A pr008033
#SBATCH -n 1
#SBATCH --cpus-per-task=20
#SBATCH --mem=45G
#SBATCH -t 00:30:00
#SBATCH -o myjob.%j.out
#SBATCH -e myjob.%j.err

JOB_ID=${SLURM_JOB_ID%;*}

/users/pr008/...
tcp://195.251.23.79:33491 --nthreads 20 --memory-limit 48.00GB --name 1 --nanny --death-timeout 60 --interface ib0

DEBUG:Executing the following command to command line
sbatch /tmp/tmpoals04ok.sh
DEBUG:Starting job: 804606
DEBUG:Starting worker: 0
DEBUG:writing job script: 
#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -p compute
#SBATCH -A pr008033
#SBATCH -n 1
#SBATCH --cpus-per-task=20
#SBATCH --mem=45G
#SBATCH -t 00:30:00
#SBATCH -o myjob.%j.out
#SBATCH -e myjob.%j.err

JOB_ID=${SLURM_JOB_ID%;*}

/users/pr008/...
tcp://195.251.23.79:33491 --nthreads 20 --memory-limit 48.00GB --name 0 --nanny --death-timeout 60 --interface ib0

DEBUG:Executing the following command to command line
sbatch /tmp/tmpsuq3ysbm.sh
DEBUG:Starting job: 804607
from dask.distributed import Client
client = Client(cluster)
client
Client Scheduler: tcp://195.251.23.79:33491 Dashboard: http://195.251.23.79:8787/status Cluster Workers: 0 Cores: 0 Memory: 0 B
client.scheduler_info()
{'type': 'Scheduler',
 'id': 'Scheduler-5198da07-3700-42aa-a068-036146073ef6',
 'address': 'tcp://195.251.23.79:33491',
 'services': {'dashboard': 8787},
 'workers': {}}

When I run squeue

JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
804606   compute dask-wor  ...  R       6:04      1 node287
804607   compute dask-wor  ...  R       6:04      1 node003

The HPC has infiniband and each node has 20 CPUs. If you need any other info please tell me.

Thank you

AChatzigoulas avatar Dec 17 '19 09:12 AChatzigoulas

Hi @Alexinho3,

First a naive question: do you wait for some times before printing your Client object?

Second, can you look at the stderr and stdout files for your worker jobs?

guillaumeeb avatar Dec 17 '19 21:12 guillaumeeb

Thank you @guillaumeeb for your answer. Yes I wait until the nodes are active. The stderr and stdout files are empty. I work in a miniconda environment with Python version 3.6.9 and

# Name                    Version                   Build  Channel
dask                      2.9.0                      py_0  
dask-core                 2.9.0                      py_0  
dask-jobqueue             0.7.0                      py_0    conda-forge
distributed               2.9.0                      py_0  

Additionally, when I type cluster, it returns:

Workers | 0 / 2
Cores | 0
Memory | 0 B

AChatzigoulas avatar Dec 18 '19 11:12 AChatzigoulas

Just a small comment: readability counts, a lot! Please use triple back-quotes aka fenced code blocks to format error messages code snippets. Bonus points if you use syntax highlighting with py for python snippets and pytb for tracebacks.

I have edited your earlier messages accordingly but it would be great if you could try to do it next time!

lesteve avatar Dec 19 '19 03:12 lesteve

It is weird that you don't get any stdout/stderr in your SLURM logs ...

Anyway what this looks like is that your jobs start correctly but that the Dask workers can not connect to the Dask scheduler. One general way of fixing this is to specify interface in SLURMCluster.

Have a look at https://blog.dask.org/2019/08/28/dask-on-summit#workers-dont-connect-to-the-scheduler for more details and let us know if that helps!

lesteve avatar Dec 19 '19 03:12 lesteve

Thank you for your answer and the readability corrections @lesteve . I 'll keep that in mind for the future.

I have already specified the interface to be ib0. If I understand correctly, when I type client.scheduler_info() the address shown should be that of the ib0? If that 's true then in my case is one of the Ethernet IPs.

AChatzigoulas avatar Dec 19 '19 09:12 AChatzigoulas

Thank you for your answer and the readability corrections @lesteve . I 'll keep that in mind for the future.

Great to hear! IMO it does make a big difference if you want to have the best chance of getting good feed-back.

I have already specified the interface to be ib0. If I understand correctly, when I type client.scheduler_info() the address shown should be that of the ib0? If that 's true then in my case is one of the Ethernet IPs.

Since you are using SLURMCluster() without argument in your snippet file, it does look like you are using a jobqueue.yaml configuration file and you set the inteface in jobqueue.yaml. Can you confirm this is the case?

If this is the case you may well be hit by https://github.com/dask/dask-jobqueue/issues/358 which is fixed in master (#366).

  • as a work-around you could try to specify the interface in the SLURMCluster arguments:
cluster = SLURMCluster(interface='ib0')
  • or alternatively, keep interface in your jobqueue.yaml and install the developement version of dask-jobqueue which has a fix for this:
pip install git+https://github.com/dask/dask-jobqueue

In any case let us know whether that works for you!

lesteve avatar Dec 19 '19 14:12 lesteve

The work-around and the alternative way both worked and the IP is now the Infiniband's. I also found out that I was using a wrong shebang and I fixed that too, but sadly the problem still persists. Can you please guide me into what else might be wrong?

The os Linux version is Red Hat Enterprise Linux Server release 6.10 (Santiago), I don't know if it matters.

AChatzigoulas avatar Dec 19 '19 16:12 AChatzigoulas

Hmmm hard to tell, can you see your SLURM logs now?

You may have seen it already but maybe this can help: https://jobqueue.dask.org/en/latest/debug.html

Something else I would try is to do it by hand (i.e. launching dask-scheduler and dask-worker yourself) outside of dask-jobqueue following: https://docs.dask.org/en/latest/setup/hpc.html#using-a-shared-network-file-system-and-a-job-scheduler

lesteve avatar Dec 19 '19 17:12 lesteve

Yeap, still empty.

I tried the manual way qsub -b y ~/miniconda3/envs/alekos/bin/dask-scheduler --scheduler-file /users/pr008/user/scheduler.json --interface ib0

qsub -b y ~/miniconda3/envs/alekos/bin/dask-worker --scheduler-file /users/pr008/user/scheduler.json

squeue -u user
805813   compute dask-wor  user  R       3:31      1 node144
805809   compute dask-sch  user  R       6:54      1 node144

then I try to create the client

from dask.distributed import Client
client = Client(scheduler_file='/users/pr008/user/scheduler.json')

and this hangs on forever

AChatzigoulas avatar Dec 19 '19 18:12 AChatzigoulas

Hmmm, can you post the content of scheduler.json?

lesteve avatar Dec 20 '19 10:12 lesteve

I just saw that it does not create the scheduler.json file.

AChatzigoulas avatar Dec 20 '19 10:12 AChatzigoulas

Other questions:

  • can you see your SLURM logs with the manual way?
  • you use qsub in your message above but I assume you meant sbatch right?

lesteve avatar Dec 20 '19 10:12 lesteve

Yes these logs are again empty. I tried both qsub and sbatch and both of them start the scheduler and the worker but still no connections with the client. Also, the scheduler.json file is not created. When I try on the interactive node (without the --interface ib0) the file is written just fine.

AChatzigoulas avatar Dec 20 '19 10:12 AChatzigoulas

This is very weird, and probably due to some quirks specific to your cluster setup. Can you post a webpage for the cluster doc if it exists, mostly out of curiosity?

When I try on the interactive node (without the --interface ib0) the file is written just fine.

  • by interactive node, do you mean the login node, i.e. the nodes that you ssh to when you want to run commands like squeue, sbatch, etc ...
  • why without --interface ib0, because ib0 is not an interface on the login node?

lesteve avatar Dec 20 '19 13:12 lesteve

The webpage is http://doc.aris.grnet.gr/

Yes I mean the login node and indeed the ib0 is an interface on the login node. The sysadmin told me that although he doesn't know about dask, if the client starts from the login node it won't work. So, I tried it on the "compute" partition which also has a different ib0 address than the login node, but still no luck.

I will try to set up everything from the beginning in the "compute" partition to see if it works.

AChatzigoulas avatar Dec 20 '19 14:12 AChatzigoulas

~Maybe something you could try is to ask your cluster IT if you get stuck to help you out.~ Sorry I read your message too quickly I missed the fact that you are in contact with IT already.

I would break down the problem into pieces if I were you:

submit a job that uses python (no dask)

What could go wrong:

  • maybe you need to activate your conda environment or to use module load or make sure your .bashrc is sourced or something
  • maybe the folder where you python lives (which python) is not available on the compute nodes.

Check that you can find your SLURM log and that it is not empty if you use print in your python script.

job that starts the Dask scheduler

  • you should be able to see scheduler.json created (make sure it is on a filesystem that you can see from the login node)
  • the SLURM log should not be empty. It should contain something like what dask-scheduler runs on your local machine

jobs that submits the Dask workers

  • the SLURM log should not be empty

lesteve avatar Dec 20 '19 15:12 lesteve

It would help a lot to have the SLURM logs from the Dask scheduler and the Dask workers, so that's what I would try first.

lesteve avatar Dec 20 '19 15:12 lesteve

I managed to run a job with a simple python script with correct logs.

Then a job with the dask-scheduler

#!/bin/bash -l

#SBATCH --job-name=pyth_script    # Job name
#SBATCH --output=jobname.%j.out # Stdout (%j expands to jobId)
#SBATCH --error=jobname.%j.err # Stderr (%j expands to jobId)
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=4
#SBATCH --partition=compute
#SBATCH --account=pr008
#SBATCH --time=00:25:00

module purge
module load gnu python/3.6.5

srun dask-scheduler --scheduler-file /pr008/user/scheduler.json --interface ib0

with outputs: scheduler.json

{
  "type": "Scheduler",
  "id": "Scheduler-0ba502f4-a060-48a1-b51d-e52f9f713802",
  "address": "tcp://192.168.132.55:8786",
  "services": {},
  "workers": {}
}

cat jobname.806969.err

distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Web dashboard not loaded.  Unable to import bokeh
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at: tcp://192.168.132.55:8786
distributed.scheduler - INFO - Local Directory:    /tmp/scheduler-11vlqb65
distributed.scheduler - INFO - -----------------------------------------------

jobname.806969.out is empty

And then a job with the dask-worker

#!/bin/bash -l

#SBATCH --job-name=pyth_script    # Job name
#SBATCH --output=jobname.%j.out # Stdout (%j expands to jobId)
#SBATCH --error=jobname.%j.err # Stderr (%j expands to jobId)
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=4
#SBATCH --partition=compute
#SBATCH --account=pr008
#SBATCH --time=00:25:00

module purge
module load gnu python/3.6.5

srun dask-worker --scheduler-file /pr008/user/scheduler.json

with outputs:

cat jobname.806970.err 
distributed.nanny - INFO -         Start Nanny at: 'tcp://192.168.116.9:44626'
distributed.worker - INFO -       Start worker at:  tcp://192.168.116.9:36536
distributed.worker - INFO -          Listening to:  tcp://192.168.116.9:36536
distributed.worker - INFO -              nanny at:        192.168.116.9:44626
distributed.worker - INFO - Waiting to connect to:  tcp://192.168.132.55:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                         20
distributed.worker - INFO -                Memory:                   67.67 GB
distributed.worker - INFO -       Local Directory: /pr008/user/worker-962pinux
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:  tcp://192.168.132.55:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection

jobname.806970.out is empty

Everything seems fine. Also, the address is different from the previous one and is not in the list of ifconfig.

Then in ipython in my conda environment:

from dask.distributed import Client
client = Client(scheduler_file='/pr008/user/scheduler.json')
client

This gives the error

Out[3]: ---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
~/miniconda3/envs/alekos/lib/python3.6/site-packages/IPython/core/formatters.py in __call__(self, obj)
    700                 type_pprinters=self.type_printers,
    701                 deferred_pprinters=self.deferred_printers)
--> 702             printer.pretty(obj)
    703             printer.flush()
    704             return stream.getvalue()

~/miniconda3/envs/alekos/lib/python3.6/site-packages/IPython/lib/pretty.py in pretty(self, obj)
    397                         if cls is not object \
    398                                 and callable(cls.__dict__.get('__repr__')):
--> 399                             return _repr_pprint(obj, self, cycle)
    400 
    401             return _default_pprint(obj, self, cycle)

~/miniconda3/envs/alekos/lib/python3.6/site-packages/IPython/lib/pretty.py in _repr_pprint(obj, p, cycle)
    687     """A pprint that just redirects to the normal repr function."""
    688     # Find newlines and replace them with p.break_()
--> 689     output = repr(obj)
    690     for idx,output_line in enumerate(output.splitlines()):
    691         if idx:

~/miniconda3/envs/alekos/lib/python3.6/site-packages/distributed/client.py in __repr__(self)
    776             workers = info.get("workers", {})
    777             nworkers = len(workers)
--> 778             nthreads = sum(w["nthreads"] for w in workers.values())
    779             text = "<%s: %r processes=%d threads=%d" % (
    780                 self.__class__.__name__,

~/miniconda3/envs/alekos/lib/python3.6/site-packages/distributed/client.py in <genexpr>(.0)
    776             workers = info.get("workers", {})
    777             nworkers = len(workers)
--> 778             nthreads = sum(w["nthreads"] for w in workers.values())
    779             text = "<%s: %r processes=%d threads=%d" % (
    780                 self.__class__.__name__,

KeyError: 'nthreads'

AChatzigoulas avatar Dec 20 '19 17:12 AChatzigoulas

Good to see that now the Dask workers managed to connect to the Dask scheduler as you can see through this line in the Dask worker log:

distributed.worker - INFO -         Registered to:  tcp://192.168.132.55:8786

It it a bit of a guess but maybe there is a Dask version mismatch between your Dask client that uses your miniconda python and your Dask workers and Dask scheduler that uses the python provided by your cluster sys-admins (i.e. the one that you get from module load python/3.6.5)

As a general guidance, using the same python environment (not only the Dask version) for your client, scheduler and workers is strongly recommended.

In your case I am guessing you can add the necessary bash commands to activate your conda environment in your SLURM submission scripts (side-comment: dask-jobqueue helps a bit with this somewhat but that is a different topic).

lesteve avatar Dec 21 '19 11:12 lesteve

@lesteve thank you very much for your help so far. Indeed my conda environment python version was 3.6.9 and I changed it to 3.6.5 to be the same with the system's. I 'm also activating the conda environment in the SLURM submission scripts

module load gnu python/3.6.5

source /pr008/user/.bashrc
source activate /pr008/user/miniconda3/bin/activate /pr008/user/miniconda3/envs/alekos

but I have the same error. Then I checked the versions because there might be some problem there:

from dask.distributed import Client 
client = Client(scheduler_file='/pr008/user/scheduler.json')
client.get_versions(check=True) 
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-3-4a95fbecd8f8> in <module>
----> 1 client.get_versions(check=True)

~/miniconda3/envs/alekos/lib/python3.6/site-packages/distributed/client.py in get_versions(self, check, packages)
   3546         >>> c.get_versions(packages=['sklearn', 'geopandas'])  # doctest: +SKIP
   3547         """
-> 3548         return self.sync(self._get_versions, check=check, packages=packages)
   3549 
   3550     async def _get_versions(self, check=False, packages=[]):

~/miniconda3/envs/alekos/lib/python3.6/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    766         else:
    767             return sync(
--> 768                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    769             )
    770 

~/miniconda3/envs/alekos/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    332     if error[0]:
    333         typ, exc, tb = error[0]
--> 334         raise exc.with_traceback(tb)
    335     else:
    336         return result[0]

~/miniconda3/envs/alekos/lib/python3.6/site-packages/distributed/utils.py in f()
    316             if callback_timeout is not None:
    317                 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
--> 318             result[0] = yield future
    319         except Exception as exc:
    320             error[0] = sys.exc_info()

~/miniconda3/envs/alekos/lib/python3.6/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

~/miniconda3/envs/alekos/lib/python3.6/site-packages/distributed/client.py in _get_versions(self, check, packages)
   3587 
   3588                 raise ValueError(
-> 3589                     "Mismatched versions found\n\n%s" % ("\n\n".join(errs))
   3590                 )
   3591 

ValueError: Mismatched versions found

bokeh
+----------------------------+---------+
|                            | version |
+----------------------------+---------+
| client                     | 1.4.0   |
| scheduler                  | None    |
| tcp://192.168.111.46:34311 | None    |
+----------------------------+---------+

cloudpickle
+----------------------------+---------+
|                            | version |
+----------------------------+---------+
| client                     | 1.2.2   |
| scheduler                  | 0.5.3   |
| tcp://192.168.111.46:34311 | 0.5.3   |
+----------------------------+---------+

dask
+----------------------------+---------+
|                            | version |
+----------------------------+---------+
| client                     | 2.9.0   |
| scheduler                  | 2.0.0   |
| tcp://192.168.111.46:34311 | 2.0.0   |
+----------------------------+---------+

dask_ml
+----------------------------+---------+
|                            | version |
+----------------------------+---------+
| client                     | 1.1.1   |
| scheduler                  | 1.0.0   |
| tcp://192.168.111.46:34311 | 1.0.0   |
+----------------------------+---------+

distributed
+----------------------------+---------+
|                            | version |
+----------------------------+---------+
| client                     | 2.9.0   |
| scheduler                  | 1.28.1  |
| tcp://192.168.111.46:34311 | 1.28.1  |
+----------------------------+---------+

numpy
+----------------------------+---------+
|                            | version |
+----------------------------+---------+
| client                     | 1.17.4  |
| scheduler                  | 1.16.4  |
| tcp://192.168.111.46:34311 | 1.16.4  |
+----------------------------+---------+

pandas
+----------------------------+---------+
|                            | version |
+----------------------------+---------+
| client                     | 0.25.3  |
| scheduler                  | 0.23.4  |
| tcp://192.168.111.46:34311 | 0.23.4  |
+----------------------------+---------+

toolz
+----------------------------+---------+
|                            | version |
+----------------------------+---------+
| client                     | 0.10.0  |
| scheduler                  | 0.9.0   |
| tcp://192.168.111.46:34311 | 0.9.0   |
+----------------------------+---------+

tornado
+----------------------------+---------+
|                            | version |
+----------------------------+---------+
| client                     | 6.0.3   |
| scheduler                  | 6.0.2   |
| tcp://192.168.111.46:34311 | 6.0.2   |
+----------------------------+---------+

A hell of a lot mismatches which I cannot understand where they come from... How can I upgrade the scheduler's packages?

AChatzigoulas avatar Dec 23 '19 10:12 AChatzigoulas

The error seems to indicate that your jobs don't use your miniconda environment in the dask worker and in the dask scheduler. You should probably your cluster IT if you get stuck, but if I were you I would try something like this in your submission script:

# don't use module load stuff for python and activate your miniconda environment
# maybe you need to source your .bashrc depends on your setup
conda activate your-conda-environment

# To debug make sure that your conda environment python is used
which python
python -c 'import distributed; print(distributed.__file__); print(distributed.__version__)'

lesteve avatar Jan 04 '20 07:01 lesteve

Dear @lesteve happy new year!

That solved the issue. I can now manually connect the workers with the scheduler. Thank you very much for your help and cooperation. I guess it is impossible to automate the process in my case but it is ok.

I try to use RandomizedSearchCV using joblib and dask and I face the same issue as in #959. I started 2 workers so I have 40 cores.

import pandas as pd
from sklearn.ensemble import ExtraTreesClassifier
import joblib
from sklearn import model_selection as ms

from dask.distributed import Client
   
client = Client(scheduler_file='/pr008/user/scheduler.json')

df = pd.read_csv('dataset.csv', thousands = ',' , low_memory=False)

ExtraTreesClassifier_params = {'criterion': ['gini', 'entropy'], 'max_features': [3, 5, 10, 25, 50, 'auto', 'log2', None], 'n_estimators': [100],
                                 'max_depth': [None], 'min_samples_split': [2, 5, 10, 0.1], 'min_impurity_decrease': [1e-7, 1e-6, 1e-5, 1e-4, 1e-3] + [0],
                                 'warm_start': [True, False], 'min_samples_leaf': [1, 2, 5, 10, 0.1], 'class_weight' : ['balanced'],
                                 'bootstrap': [True, False]}
n_iter_search = 200
clf = ExtraTreesClassifier()

sk_random_search = ms.RandomizedSearchCV(clf, ExtraTreesClassifier_params, verbose = 2, scoring = 'f1_macro', n_iter=n_iter_search, n_jobs=-1)

with joblib.parallel_backend('dask'):
    sk_random_search.fit(df.drop(columns=['class']), df['class'])

And I get the following

[Parallel(n_jobs=-1)]: Using backend DaskDistributedBackend with 40 concurrent workers.

Then a bunch of times this error

distributed.client - ERROR - Error in callback <function DaskDistributedBackend.apply_async.<locals>.callback_wrapper at 0x7ff35833ae18> of <Future: error, key: _fit_and_score-batch-57cb4d49e2f943209d604df2c9e71766>:
Traceback (most recent call last):
  File "/users/pr008/user/miniconda3/envs/alekos/lib/python3.6/site-packages/distributed/client.py", line 288, in execute_callback
    fn(fut)
  File "/users/pr008/user/miniconda3/envs/alekos/lib/python3.6/site-packages/joblib/_dask.py", line 260, in callback_wrapper
    result = future.result()
  File "/users/pr008/user/miniconda3/envs/alekos/lib/python3.6/site-packages/distributed/client.py", line 223, in result
    raise exc.with_traceback(tb)
  File "/users/pr008/user/miniconda3/envs/alekos/lib/python3.6/site-packages/joblib/_dask.py", line 106, in __call__
    results.append(func(*args, **kwargs))
  File "/users/pr008/user/miniconda3/envs/alekos/lib/python3.6/site-packages/sklearn/model_selection/_validation.py", line 516, in _fit_and_score
    estimator.fit(X_train, y_train, **fit_params)
  File "/users/pr008/user/miniconda3/envs/alekos/lib/python3.6/site-packages/sklearn/ensemble/forest.py", line 340, in fit
    self.n_classes_ = self.n_classes_[0]
TypeError: 'int' object is not subscriptable

and a bunch of times this error

distributed.client - ERROR - Error in callback <function DaskDistributedBackend.apply_async.<locals>.callback_wrapper at 0x7fa7b52a3ae8> of <Future: cancelled, key: _fit_and_score-batch-817993e9946f404f9917b480ef83fdc1>:
Traceback (most recent call last):
  File "/users/pr008/user/miniconda3/envs/alekos/lib/python3.6/site-packages/distributed/client.py", line 288, in execute_callback
    fn(fut)
  File "/users/pr008/user/miniconda3/envs/alekos/lib/python3.6/site-packages/joblib/_dask.py", line 260, in callback_wrapper
    result = future.result()
  File "/users/pr008/user/miniconda3/envs/alekos/lib/python3.6/site-packages/distributed/client.py", line 225, in result
    raise result
concurrent.futures._base.CancelledError: _fit_and_score-batch-817993e9946f404f9917b480ef83fdc1

and in the end

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-5-14e28483608f> in <module>
      2 with joblib.parallel_backend('dask'):
----> 3     sk_random_search.fit(data_scaled_all, df['class'])

~/miniconda3/envs/alekos/lib/python3.6/site-packages/sklearn/model_selection/_search.py in fit(self, X, y, groups, **fit_params)
    686                 return results
    687 
--> 688             self._run_search(evaluate_candidates)
    689 
    690         # For multi-metric evaluation, store the best_index_, best_params_ and

~/miniconda3/envs/alekos/lib/python3.6/site-packages/sklearn/model_selection/_search.py in _run_search(self, evaluate_candidates)
   1467         evaluate_candidates(ParameterSampler(
   1468             self.param_distributions, self.n_iter,
-> 1469             random_state=self.random_state))

~/miniconda3/envs/alekos/lib/python3.6/site-packages/sklearn/model_selection/_search.py in evaluate_candidates(candidate_params)
    665                                for parameters, (train, test)
    666                                in product(candidate_params,
--> 667                                           cv.split(X, y, groups)))
    668 
    669                 if len(out) < 1:

~/miniconda3/envs/alekos/lib/python3.6/site-packages/joblib/parallel.py in __call__(self, iterable)
   1014 
   1015             with self._backend.retrieval_context():
-> 1016                 self.retrieve()
   1017             # Make sure that we get a last message telling us we are done
   1018             elapsed_time = time.time() - self._start_time

~/miniconda3/envs/alekos/lib/python3.6/site-packages/joblib/parallel.py in retrieve(self)
    908                     self._output.extend(job.get(timeout=self.timeout))
    909                 else:
--> 910                     self._output.extend(job.get())
    911 
    912             except BaseException as exception:

~/miniconda3/envs/alekos/lib/python3.6/site-packages/joblib/_dask.py in get()
    268 
    269         def get():
--> 270             return ref().result()
    271 
    272         future.get = get  # monkey patch to achieve AsyncResult API

~/miniconda3/envs/alekos/lib/python3.6/site-packages/distributed/client.py in result(self, timeout)
    221         if self.status == "error":
    222             typ, exc, tb = result
--> 223             raise exc.with_traceback(tb)
    224         elif self.status == "cancelled":
    225             raise result

~/miniconda3/envs/alekos/lib/python3.6/site-packages/joblib/_dask.py in __call__()
    104                 kwargs = {k: v(data) if isinstance(v, itemgetter) else v
    105                           for (k, v) in kwargs.items()}
--> 106                 results.append(func(*args, **kwargs))
    107         return results
    108 

~/miniconda3/envs/alekos/lib/python3.6/site-packages/sklearn/model_selection/_validation.py in _fit_and_score()
    514             estimator.fit(X_train, **fit_params)
    515         else:
--> 516             estimator.fit(X_train, y_train, **fit_params)
    517 
    518     except Exception as e:

~/miniconda3/envs/alekos/lib/python3.6/site-packages/sklearn/ensemble/forest.py in fit()
    338         # Decapsulate classes_ attributes
    339         if hasattr(self, "classes_") and self.n_outputs_ == 1:
--> 340             self.n_classes_ = self.n_classes_[0]
    341             self.classes_ = self.classes_[0]
    342 

TypeError: 'int' object is not subscriptable

Should I close this issue and start a new one for this or comment at #959?

AChatzigoulas avatar Jan 07 '20 16:01 AChatzigoulas

That solved the issue. I can now manually connect the workers with the scheduler. Thank you very much for your help and cooperation.

Great to know that it works! It would be even greater if you could post the script that you are now using. This could be useful for other people who bump in a similar issue as yours!

I guess it is impossible to automate the process in my case but it is ok.

I am 95% confident that if I was sitting next to you I could find a way to make it work in ~1h with SLURMCluster. This kind of problems is not super easy to solve on github unfortunately ...

About your error, it is hard to tell without running your code whether it is a problem with:

  • your code vs scikit-learn vs joblib. Suggestion: Try to remove with joblib.parallel_backend('dask'): and see whether the problem persists. You may need to run on a subset of your parameter grid or on a subset of your data to run in a reasonable amount of time on a single machine. You can also try to run with n_jobs=1 to be in sequential mode and see whether the problem persists.
  • dask/distributed vs dask-jobqueue. Suggestion: try to run the same code with a dask.distributed.LocalCluster to see whether the problem persists. I would guess it is something in dask/distributed rather than dask-jobqueue but I may be wrong about this.

Also it is very likely not directly related to https://github.com/joblib/joblib/issues/959, because you don't do nested parallelism. In other words, you have a single level of parallelism at the RandomSearchCV level and none at the estimator level. https://github.com/joblib/joblib/issues/959 has nested parallelism: GridSearchCV + ColumnTransformer. It could be the manifestation of the same underlying cause, again this is very hard to tell ...

lesteve avatar Jan 08 '20 10:01 lesteve

Great to know that it works! It would be even greater if you could post the script that you are now using. This could be useful for other people who bump in a similar issue as yours!

Sure, it is the least I can do.

The Slurm script for the scheduler is

#!/bin/bash -l

#SBATCH --job-name=pyth_script    # Job name
#SBATCH --output=jobname.%j.out # Stdout (%j expands to jobId)
#SBATCH --error=jobname.%j.err # Stderr (%j expands to jobId)
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=20
#SBATCH --partition=compute
#SBATCH --account=pr008
#SBATCH --time=01:00:00

module purge

source /users/pr008/user/.bashrc
conda activate alekos

srun dask-scheduler --scheduler-file /pr008/user/scheduler.json --interface ib0

and for the worker

#!/bin/bash -l

#SBATCH --job-name=pyth_script    # Job name
#SBATCH --output=jobname.%j.out # Stdout (%j expands to jobId)
#SBATCH --error=jobname.%j.err # Stderr (%j expands to jobId)
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=20
#SBATCH --partition=compute
#SBATCH --account=pr008
#SBATCH --time=01:00:00

module purge

source /users/pr008/user/.bashrc
conda activate alekos

srun dask-worker --scheduler-file /pr008/user/scheduler.json

The only drawback this way is that I need to allocate 1 node for the scheduler.

I am 95% confident that if I was sitting next to you I could find a way to make it work in ~1h with SLURMCluster. This kind of problems is not super easy to solve on github unfortunately ...

Well I'm pretty sure you would indeed XD. I think the reason that I cannot make it work with SLURMCluster is because it cannot see the ip of the compute node from the login node. When I submit the above scripts the scheduler and the workers connect to an ip that is not shown by ifconfig. I tried to put that ip using --host parameter in SLURMCluster but it failed.

Try to remove with joblib.parallel_backend('dask'): and see whether the problem persists.

Removing it, it uses the login CPUs and is running just fine.

You may need to run on a subset of your parameter grid or on a subset of your data to run in a reasonable amount of time on a single machine.

That did not help. Either way I only had 200 iterations in RandomSearchCV which is very small.

You can also try to run with n_jobs=1 to be in sequential mode and see whether the problem persists.

Setting n_jobs=1 do not set sequential mode and that is weird.

[Parallel(n_jobs=1)]: Using backend DaskDistributedBackend with 40 concurrent workers.

and the same errors as before.

If i remove joblib.parallel_backend('dask'): it uses sequential mode correctly.

try to run the same code with a dask.distributed.LocalCluster

LocalCluster works. In general dask works in the login node.

AChatzigoulas avatar Jan 08 '20 11:01 AChatzigoulas

OK thanks for the details. There seems to be some problems with the joblib dask backend and maybe there are additional issues with dask-jobqueue and also some of your cluster quirks, hard to tell ...

A work-around you could try is to use dask_ml.model_selection.RandomSearchCV, i.e. you would not be using joblib dask backend.

Out of interest do you launch your submission scripts in https://github.com/dask/dask-jobqueue/issues/376#issuecomment-572005474 with salloc rather than sbatch? I find it a bit unusual to have srun commands in submission scripts but I am not a SLURM expert at all.

lesteve avatar Jan 10 '20 10:01 lesteve

A work-around you could try is to use dask_ml.model_selection.RandomSearchCV, i.e. you would not be using joblib dask backend.

I tried that and it worked but it is super slow. It almost took 5 minutes for 20 iterations. To compare, in the login node with less CPUs it took less 10 seconds.

Out of interest do you launch your submission scripts in #376 (comment) with salloc rather than sbatch? I find it a bit unusual to have srun commands in submission scripts but I am not a SLURM expert at all.

I also tried salloc but i receive an error. I follow the documentation of the HPC on how to submit the jobs.

AChatzigoulas avatar Jan 10 '20 16:01 AChatzigoulas

I tried that and it worked but it is super slow. It almost took 5 minutes for 20 iterations. To compare, in the login node with less CPUs it took less 10 seconds.

Looking at the dashboard could give you some insights why this is so slow. See this for more details. Wild guess: maybe the dataframe is serialized and sent to the workers for each computation. IIRC you can use client.scatter to send the data to the workers and then each computation should be quicker.

lesteve avatar Jan 15 '20 11:01 lesteve

I propose to close this issue as initial problem was solved, and there is no more activity.

guillaumeeb avatar Sep 04 '21 06:09 guillaumeeb

Closing as stale.

guillaumeeb avatar Aug 30 '22 06:08 guillaumeeb