No workers info available after scale() with SLURMCluster
I have a very similar issue with #246. After I create the cluster and scale it the nodes are running but no CPUs are available.
import logging
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster()
DEBUG:Using selector: EpollSelector
DEBUG:Using selector: EpollSelector
cluster.scale(jobs = 2)
DEBUG:Starting worker: 1
DEBUG:writing job script:
#!/usr/bin/env bash
#SBATCH -J dask-worker
#SBATCH -p compute
#SBATCH -A pr008033
#SBATCH -n 1
#SBATCH --cpus-per-task=20
#SBATCH --mem=45G
#SBATCH -t 00:30:00
#SBATCH -o myjob.%j.out
#SBATCH -e myjob.%j.err
JOB_ID=${SLURM_JOB_ID%;*}
/users/pr008/...
tcp://195.251.23.79:33491 --nthreads 20 --memory-limit 48.00GB --name 1 --nanny --death-timeout 60 --interface ib0
DEBUG:Executing the following command to command line
sbatch /tmp/tmpoals04ok.sh
DEBUG:Starting job: 804606
DEBUG:Starting worker: 0
DEBUG:writing job script:
#!/usr/bin/env bash
#SBATCH -J dask-worker
#SBATCH -p compute
#SBATCH -A pr008033
#SBATCH -n 1
#SBATCH --cpus-per-task=20
#SBATCH --mem=45G
#SBATCH -t 00:30:00
#SBATCH -o myjob.%j.out
#SBATCH -e myjob.%j.err
JOB_ID=${SLURM_JOB_ID%;*}
/users/pr008/...
tcp://195.251.23.79:33491 --nthreads 20 --memory-limit 48.00GB --name 0 --nanny --death-timeout 60 --interface ib0
DEBUG:Executing the following command to command line
sbatch /tmp/tmpsuq3ysbm.sh
DEBUG:Starting job: 804607
from dask.distributed import Client
client = Client(cluster)
client
| Client Scheduler: tcp://195.251.23.79:33491 Dashboard: http://195.251.23.79:8787/status | Cluster Workers: 0 Cores: 0 Memory: 0 B |
|---|
client.scheduler_info()
{'type': 'Scheduler',
'id': 'Scheduler-5198da07-3700-42aa-a068-036146073ef6',
'address': 'tcp://195.251.23.79:33491',
'services': {'dashboard': 8787},
'workers': {}}
When I run squeue
JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON)
804606 compute dask-wor ... R 6:04 1 node287
804607 compute dask-wor ... R 6:04 1 node003
The HPC has infiniband and each node has 20 CPUs. If you need any other info please tell me.
Thank you
Hi @Alexinho3,
First a naive question: do you wait for some times before printing your Client object?
Second, can you look at the stderr and stdout files for your worker jobs?
Thank you @guillaumeeb for your answer. Yes I wait until the nodes are active. The stderr and stdout files are empty. I work in a miniconda environment with Python version 3.6.9 and
# Name Version Build Channel
dask 2.9.0 py_0
dask-core 2.9.0 py_0
dask-jobqueue 0.7.0 py_0 conda-forge
distributed 2.9.0 py_0
Additionally, when I type cluster, it returns:
Workers | 0 / 2
Cores | 0
Memory | 0 B
Just a small comment: readability counts, a lot! Please use triple back-quotes aka fenced code blocks to format error messages code snippets. Bonus points if you use syntax highlighting with py for python snippets and pytb for tracebacks.
I have edited your earlier messages accordingly but it would be great if you could try to do it next time!
It is weird that you don't get any stdout/stderr in your SLURM logs ...
Anyway what this looks like is that your jobs start correctly but that the Dask workers can not connect to the Dask scheduler. One general way of fixing this is to specify interface in SLURMCluster.
Have a look at https://blog.dask.org/2019/08/28/dask-on-summit#workers-dont-connect-to-the-scheduler for more details and let us know if that helps!
Thank you for your answer and the readability corrections @lesteve . I 'll keep that in mind for the future.
I have already specified the interface to be ib0. If I understand correctly, when I type
client.scheduler_info()
the address shown should be that of the ib0? If that 's true then in my case is one of the Ethernet IPs.
Thank you for your answer and the readability corrections @lesteve . I 'll keep that in mind for the future.
Great to hear! IMO it does make a big difference if you want to have the best chance of getting good feed-back.
I have already specified the interface to be ib0. If I understand correctly, when I type client.scheduler_info() the address shown should be that of the ib0? If that 's true then in my case is one of the Ethernet IPs.
Since you are using SLURMCluster() without argument in your snippet file, it does look like you are using a jobqueue.yaml configuration file and you set the inteface in jobqueue.yaml. Can you confirm this is the case?
If this is the case you may well be hit by https://github.com/dask/dask-jobqueue/issues/358 which is fixed in master (#366).
- as a work-around you could try to specify the interface in the
SLURMClusterarguments:
cluster = SLURMCluster(interface='ib0')
- or alternatively, keep interface in your
jobqueue.yamland install the developement version ofdask-jobqueuewhich has a fix for this:
pip install git+https://github.com/dask/dask-jobqueue
In any case let us know whether that works for you!
The work-around and the alternative way both worked and the IP is now the Infiniband's. I also found out that I was using a wrong shebang and I fixed that too, but sadly the problem still persists. Can you please guide me into what else might be wrong?
The os Linux version is Red Hat Enterprise Linux Server release 6.10 (Santiago), I don't know if it matters.
Hmmm hard to tell, can you see your SLURM logs now?
You may have seen it already but maybe this can help: https://jobqueue.dask.org/en/latest/debug.html
Something else I would try is to do it by hand (i.e. launching dask-scheduler and dask-worker yourself) outside of dask-jobqueue following:
https://docs.dask.org/en/latest/setup/hpc.html#using-a-shared-network-file-system-and-a-job-scheduler
Yeap, still empty.
I tried the manual way
qsub -b y ~/miniconda3/envs/alekos/bin/dask-scheduler --scheduler-file /users/pr008/user/scheduler.json --interface ib0
qsub -b y ~/miniconda3/envs/alekos/bin/dask-worker --scheduler-file /users/pr008/user/scheduler.json
squeue -u user
805813 compute dask-wor user R 3:31 1 node144
805809 compute dask-sch user R 6:54 1 node144
then I try to create the client
from dask.distributed import Client
client = Client(scheduler_file='/users/pr008/user/scheduler.json')
and this hangs on forever
Hmmm, can you post the content of scheduler.json?
I just saw that it does not create the scheduler.json file.
Other questions:
- can you see your SLURM logs with the manual way?
- you use
qsubin your message above but I assume you meantsbatchright?
Yes these logs are again empty. I tried both qsub and sbatch and both of them start the scheduler and the worker but still no connections with the client. Also, the scheduler.json file is not created. When I try on the interactive node (without the --interface ib0) the file is written just fine.
This is very weird, and probably due to some quirks specific to your cluster setup. Can you post a webpage for the cluster doc if it exists, mostly out of curiosity?
When I try on the interactive node (without the --interface ib0) the file is written just fine.
- by interactive node, do you mean the login node, i.e. the nodes that you ssh to when you want to run commands like
squeue,sbatch, etc ... - why without
--interface ib0, becauseib0is not an interface on the login node?
The webpage is http://doc.aris.grnet.gr/
Yes I mean the login node and indeed the ib0 is an interface on the login node. The sysadmin told me that although he doesn't know about dask, if the client starts from the login node it won't work. So, I tried it on the "compute" partition which also has a different ib0 address than the login node, but still no luck.
I will try to set up everything from the beginning in the "compute" partition to see if it works.
~Maybe something you could try is to ask your cluster IT if you get stuck to help you out.~ Sorry I read your message too quickly I missed the fact that you are in contact with IT already.
I would break down the problem into pieces if I were you:
submit a job that uses python (no dask)
What could go wrong:
- maybe you need to activate your conda environment or to use
module loador make sure your.bashrcis sourced or something - maybe the folder where you python lives (
which python) is not available on the compute nodes.
Check that you can find your SLURM log and that it is not empty if you use print in your python script.
job that starts the Dask scheduler
- you should be able to see
scheduler.jsoncreated (make sure it is on a filesystem that you can see from the login node) - the SLURM log should not be empty. It should contain something like what
dask-schedulerruns on your local machine
jobs that submits the Dask workers
- the SLURM log should not be empty
It would help a lot to have the SLURM logs from the Dask scheduler and the Dask workers, so that's what I would try first.
I managed to run a job with a simple python script with correct logs.
Then a job with the dask-scheduler
#!/bin/bash -l
#SBATCH --job-name=pyth_script # Job name
#SBATCH --output=jobname.%j.out # Stdout (%j expands to jobId)
#SBATCH --error=jobname.%j.err # Stderr (%j expands to jobId)
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=4
#SBATCH --partition=compute
#SBATCH --account=pr008
#SBATCH --time=00:25:00
module purge
module load gnu python/3.6.5
srun dask-scheduler --scheduler-file /pr008/user/scheduler.json --interface ib0
with outputs:
scheduler.json
{
"type": "Scheduler",
"id": "Scheduler-0ba502f4-a060-48a1-b51d-e52f9f713802",
"address": "tcp://192.168.132.55:8786",
"services": {},
"workers": {}
}
cat jobname.806969.err
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Web dashboard not loaded. Unable to import bokeh
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://192.168.132.55:8786
distributed.scheduler - INFO - Local Directory: /tmp/scheduler-11vlqb65
distributed.scheduler - INFO - -----------------------------------------------
jobname.806969.out is empty
And then a job with the dask-worker
#!/bin/bash -l
#SBATCH --job-name=pyth_script # Job name
#SBATCH --output=jobname.%j.out # Stdout (%j expands to jobId)
#SBATCH --error=jobname.%j.err # Stderr (%j expands to jobId)
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=4
#SBATCH --partition=compute
#SBATCH --account=pr008
#SBATCH --time=00:25:00
module purge
module load gnu python/3.6.5
srun dask-worker --scheduler-file /pr008/user/scheduler.json
with outputs:
cat jobname.806970.err
distributed.nanny - INFO - Start Nanny at: 'tcp://192.168.116.9:44626'
distributed.worker - INFO - Start worker at: tcp://192.168.116.9:36536
distributed.worker - INFO - Listening to: tcp://192.168.116.9:36536
distributed.worker - INFO - nanny at: 192.168.116.9:44626
distributed.worker - INFO - Waiting to connect to: tcp://192.168.132.55:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 20
distributed.worker - INFO - Memory: 67.67 GB
distributed.worker - INFO - Local Directory: /pr008/user/worker-962pinux
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://192.168.132.55:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
jobname.806970.out is empty
Everything seems fine. Also, the address is different from the previous one and is not in the list of ifconfig.
Then in ipython in my conda environment:
from dask.distributed import Client
client = Client(scheduler_file='/pr008/user/scheduler.json')
client
This gives the error
Out[3]: ---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
~/miniconda3/envs/alekos/lib/python3.6/site-packages/IPython/core/formatters.py in __call__(self, obj)
700 type_pprinters=self.type_printers,
701 deferred_pprinters=self.deferred_printers)
--> 702 printer.pretty(obj)
703 printer.flush()
704 return stream.getvalue()
~/miniconda3/envs/alekos/lib/python3.6/site-packages/IPython/lib/pretty.py in pretty(self, obj)
397 if cls is not object \
398 and callable(cls.__dict__.get('__repr__')):
--> 399 return _repr_pprint(obj, self, cycle)
400
401 return _default_pprint(obj, self, cycle)
~/miniconda3/envs/alekos/lib/python3.6/site-packages/IPython/lib/pretty.py in _repr_pprint(obj, p, cycle)
687 """A pprint that just redirects to the normal repr function."""
688 # Find newlines and replace them with p.break_()
--> 689 output = repr(obj)
690 for idx,output_line in enumerate(output.splitlines()):
691 if idx:
~/miniconda3/envs/alekos/lib/python3.6/site-packages/distributed/client.py in __repr__(self)
776 workers = info.get("workers", {})
777 nworkers = len(workers)
--> 778 nthreads = sum(w["nthreads"] for w in workers.values())
779 text = "<%s: %r processes=%d threads=%d" % (
780 self.__class__.__name__,
~/miniconda3/envs/alekos/lib/python3.6/site-packages/distributed/client.py in <genexpr>(.0)
776 workers = info.get("workers", {})
777 nworkers = len(workers)
--> 778 nthreads = sum(w["nthreads"] for w in workers.values())
779 text = "<%s: %r processes=%d threads=%d" % (
780 self.__class__.__name__,
KeyError: 'nthreads'
Good to see that now the Dask workers managed to connect to the Dask scheduler as you can see through this line in the Dask worker log:
distributed.worker - INFO - Registered to: tcp://192.168.132.55:8786
It it a bit of a guess but maybe there is a Dask version mismatch between your Dask client that uses your miniconda python and your Dask workers and Dask scheduler that uses the python provided by your cluster sys-admins (i.e. the one that you get from module load python/3.6.5)
As a general guidance, using the same python environment (not only the Dask version) for your client, scheduler and workers is strongly recommended.
In your case I am guessing you can add the necessary bash commands to activate your conda environment in your SLURM submission scripts (side-comment: dask-jobqueue helps a bit with this somewhat but that is a different topic).
@lesteve thank you very much for your help so far. Indeed my conda environment python version was 3.6.9 and I changed it to 3.6.5 to be the same with the system's. I 'm also activating the conda environment in the SLURM submission scripts
module load gnu python/3.6.5
source /pr008/user/.bashrc
source activate /pr008/user/miniconda3/bin/activate /pr008/user/miniconda3/envs/alekos
but I have the same error. Then I checked the versions because there might be some problem there:
from dask.distributed import Client
client = Client(scheduler_file='/pr008/user/scheduler.json')
client.get_versions(check=True)
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-3-4a95fbecd8f8> in <module>
----> 1 client.get_versions(check=True)
~/miniconda3/envs/alekos/lib/python3.6/site-packages/distributed/client.py in get_versions(self, check, packages)
3546 >>> c.get_versions(packages=['sklearn', 'geopandas']) # doctest: +SKIP
3547 """
-> 3548 return self.sync(self._get_versions, check=check, packages=packages)
3549
3550 async def _get_versions(self, check=False, packages=[]):
~/miniconda3/envs/alekos/lib/python3.6/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
766 else:
767 return sync(
--> 768 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
769 )
770
~/miniconda3/envs/alekos/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
332 if error[0]:
333 typ, exc, tb = error[0]
--> 334 raise exc.with_traceback(tb)
335 else:
336 return result[0]
~/miniconda3/envs/alekos/lib/python3.6/site-packages/distributed/utils.py in f()
316 if callback_timeout is not None:
317 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
--> 318 result[0] = yield future
319 except Exception as exc:
320 error[0] = sys.exc_info()
~/miniconda3/envs/alekos/lib/python3.6/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
~/miniconda3/envs/alekos/lib/python3.6/site-packages/distributed/client.py in _get_versions(self, check, packages)
3587
3588 raise ValueError(
-> 3589 "Mismatched versions found\n\n%s" % ("\n\n".join(errs))
3590 )
3591
ValueError: Mismatched versions found
bokeh
+----------------------------+---------+
| | version |
+----------------------------+---------+
| client | 1.4.0 |
| scheduler | None |
| tcp://192.168.111.46:34311 | None |
+----------------------------+---------+
cloudpickle
+----------------------------+---------+
| | version |
+----------------------------+---------+
| client | 1.2.2 |
| scheduler | 0.5.3 |
| tcp://192.168.111.46:34311 | 0.5.3 |
+----------------------------+---------+
dask
+----------------------------+---------+
| | version |
+----------------------------+---------+
| client | 2.9.0 |
| scheduler | 2.0.0 |
| tcp://192.168.111.46:34311 | 2.0.0 |
+----------------------------+---------+
dask_ml
+----------------------------+---------+
| | version |
+----------------------------+---------+
| client | 1.1.1 |
| scheduler | 1.0.0 |
| tcp://192.168.111.46:34311 | 1.0.0 |
+----------------------------+---------+
distributed
+----------------------------+---------+
| | version |
+----------------------------+---------+
| client | 2.9.0 |
| scheduler | 1.28.1 |
| tcp://192.168.111.46:34311 | 1.28.1 |
+----------------------------+---------+
numpy
+----------------------------+---------+
| | version |
+----------------------------+---------+
| client | 1.17.4 |
| scheduler | 1.16.4 |
| tcp://192.168.111.46:34311 | 1.16.4 |
+----------------------------+---------+
pandas
+----------------------------+---------+
| | version |
+----------------------------+---------+
| client | 0.25.3 |
| scheduler | 0.23.4 |
| tcp://192.168.111.46:34311 | 0.23.4 |
+----------------------------+---------+
toolz
+----------------------------+---------+
| | version |
+----------------------------+---------+
| client | 0.10.0 |
| scheduler | 0.9.0 |
| tcp://192.168.111.46:34311 | 0.9.0 |
+----------------------------+---------+
tornado
+----------------------------+---------+
| | version |
+----------------------------+---------+
| client | 6.0.3 |
| scheduler | 6.0.2 |
| tcp://192.168.111.46:34311 | 6.0.2 |
+----------------------------+---------+
A hell of a lot mismatches which I cannot understand where they come from... How can I upgrade the scheduler's packages?
The error seems to indicate that your jobs don't use your miniconda environment in the dask worker and in the dask scheduler. You should probably your cluster IT if you get stuck, but if I were you I would try something like this in your submission script:
# don't use module load stuff for python and activate your miniconda environment
# maybe you need to source your .bashrc depends on your setup
conda activate your-conda-environment
# To debug make sure that your conda environment python is used
which python
python -c 'import distributed; print(distributed.__file__); print(distributed.__version__)'
Dear @lesteve happy new year!
That solved the issue. I can now manually connect the workers with the scheduler. Thank you very much for your help and cooperation. I guess it is impossible to automate the process in my case but it is ok.
I try to use RandomizedSearchCV using joblib and dask and I face the same issue as in #959. I started 2 workers so I have 40 cores.
import pandas as pd
from sklearn.ensemble import ExtraTreesClassifier
import joblib
from sklearn import model_selection as ms
from dask.distributed import Client
client = Client(scheduler_file='/pr008/user/scheduler.json')
df = pd.read_csv('dataset.csv', thousands = ',' , low_memory=False)
ExtraTreesClassifier_params = {'criterion': ['gini', 'entropy'], 'max_features': [3, 5, 10, 25, 50, 'auto', 'log2', None], 'n_estimators': [100],
'max_depth': [None], 'min_samples_split': [2, 5, 10, 0.1], 'min_impurity_decrease': [1e-7, 1e-6, 1e-5, 1e-4, 1e-3] + [0],
'warm_start': [True, False], 'min_samples_leaf': [1, 2, 5, 10, 0.1], 'class_weight' : ['balanced'],
'bootstrap': [True, False]}
n_iter_search = 200
clf = ExtraTreesClassifier()
sk_random_search = ms.RandomizedSearchCV(clf, ExtraTreesClassifier_params, verbose = 2, scoring = 'f1_macro', n_iter=n_iter_search, n_jobs=-1)
with joblib.parallel_backend('dask'):
sk_random_search.fit(df.drop(columns=['class']), df['class'])
And I get the following
[Parallel(n_jobs=-1)]: Using backend DaskDistributedBackend with 40 concurrent workers.
Then a bunch of times this error
distributed.client - ERROR - Error in callback <function DaskDistributedBackend.apply_async.<locals>.callback_wrapper at 0x7ff35833ae18> of <Future: error, key: _fit_and_score-batch-57cb4d49e2f943209d604df2c9e71766>:
Traceback (most recent call last):
File "/users/pr008/user/miniconda3/envs/alekos/lib/python3.6/site-packages/distributed/client.py", line 288, in execute_callback
fn(fut)
File "/users/pr008/user/miniconda3/envs/alekos/lib/python3.6/site-packages/joblib/_dask.py", line 260, in callback_wrapper
result = future.result()
File "/users/pr008/user/miniconda3/envs/alekos/lib/python3.6/site-packages/distributed/client.py", line 223, in result
raise exc.with_traceback(tb)
File "/users/pr008/user/miniconda3/envs/alekos/lib/python3.6/site-packages/joblib/_dask.py", line 106, in __call__
results.append(func(*args, **kwargs))
File "/users/pr008/user/miniconda3/envs/alekos/lib/python3.6/site-packages/sklearn/model_selection/_validation.py", line 516, in _fit_and_score
estimator.fit(X_train, y_train, **fit_params)
File "/users/pr008/user/miniconda3/envs/alekos/lib/python3.6/site-packages/sklearn/ensemble/forest.py", line 340, in fit
self.n_classes_ = self.n_classes_[0]
TypeError: 'int' object is not subscriptable
and a bunch of times this error
distributed.client - ERROR - Error in callback <function DaskDistributedBackend.apply_async.<locals>.callback_wrapper at 0x7fa7b52a3ae8> of <Future: cancelled, key: _fit_and_score-batch-817993e9946f404f9917b480ef83fdc1>:
Traceback (most recent call last):
File "/users/pr008/user/miniconda3/envs/alekos/lib/python3.6/site-packages/distributed/client.py", line 288, in execute_callback
fn(fut)
File "/users/pr008/user/miniconda3/envs/alekos/lib/python3.6/site-packages/joblib/_dask.py", line 260, in callback_wrapper
result = future.result()
File "/users/pr008/user/miniconda3/envs/alekos/lib/python3.6/site-packages/distributed/client.py", line 225, in result
raise result
concurrent.futures._base.CancelledError: _fit_and_score-batch-817993e9946f404f9917b480ef83fdc1
and in the end
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-5-14e28483608f> in <module>
2 with joblib.parallel_backend('dask'):
----> 3 sk_random_search.fit(data_scaled_all, df['class'])
~/miniconda3/envs/alekos/lib/python3.6/site-packages/sklearn/model_selection/_search.py in fit(self, X, y, groups, **fit_params)
686 return results
687
--> 688 self._run_search(evaluate_candidates)
689
690 # For multi-metric evaluation, store the best_index_, best_params_ and
~/miniconda3/envs/alekos/lib/python3.6/site-packages/sklearn/model_selection/_search.py in _run_search(self, evaluate_candidates)
1467 evaluate_candidates(ParameterSampler(
1468 self.param_distributions, self.n_iter,
-> 1469 random_state=self.random_state))
~/miniconda3/envs/alekos/lib/python3.6/site-packages/sklearn/model_selection/_search.py in evaluate_candidates(candidate_params)
665 for parameters, (train, test)
666 in product(candidate_params,
--> 667 cv.split(X, y, groups)))
668
669 if len(out) < 1:
~/miniconda3/envs/alekos/lib/python3.6/site-packages/joblib/parallel.py in __call__(self, iterable)
1014
1015 with self._backend.retrieval_context():
-> 1016 self.retrieve()
1017 # Make sure that we get a last message telling us we are done
1018 elapsed_time = time.time() - self._start_time
~/miniconda3/envs/alekos/lib/python3.6/site-packages/joblib/parallel.py in retrieve(self)
908 self._output.extend(job.get(timeout=self.timeout))
909 else:
--> 910 self._output.extend(job.get())
911
912 except BaseException as exception:
~/miniconda3/envs/alekos/lib/python3.6/site-packages/joblib/_dask.py in get()
268
269 def get():
--> 270 return ref().result()
271
272 future.get = get # monkey patch to achieve AsyncResult API
~/miniconda3/envs/alekos/lib/python3.6/site-packages/distributed/client.py in result(self, timeout)
221 if self.status == "error":
222 typ, exc, tb = result
--> 223 raise exc.with_traceback(tb)
224 elif self.status == "cancelled":
225 raise result
~/miniconda3/envs/alekos/lib/python3.6/site-packages/joblib/_dask.py in __call__()
104 kwargs = {k: v(data) if isinstance(v, itemgetter) else v
105 for (k, v) in kwargs.items()}
--> 106 results.append(func(*args, **kwargs))
107 return results
108
~/miniconda3/envs/alekos/lib/python3.6/site-packages/sklearn/model_selection/_validation.py in _fit_and_score()
514 estimator.fit(X_train, **fit_params)
515 else:
--> 516 estimator.fit(X_train, y_train, **fit_params)
517
518 except Exception as e:
~/miniconda3/envs/alekos/lib/python3.6/site-packages/sklearn/ensemble/forest.py in fit()
338 # Decapsulate classes_ attributes
339 if hasattr(self, "classes_") and self.n_outputs_ == 1:
--> 340 self.n_classes_ = self.n_classes_[0]
341 self.classes_ = self.classes_[0]
342
TypeError: 'int' object is not subscriptable
Should I close this issue and start a new one for this or comment at #959?
That solved the issue. I can now manually connect the workers with the scheduler. Thank you very much for your help and cooperation.
Great to know that it works! It would be even greater if you could post the script that you are now using. This could be useful for other people who bump in a similar issue as yours!
I guess it is impossible to automate the process in my case but it is ok.
I am 95% confident that if I was sitting next to you I could find a way to make it work in ~1h with SLURMCluster. This kind of problems is not super easy to solve on github unfortunately ...
About your error, it is hard to tell without running your code whether it is a problem with:
- your code vs scikit-learn vs joblib. Suggestion: Try to remove
with joblib.parallel_backend('dask'):and see whether the problem persists. You may need to run on a subset of your parameter grid or on a subset of your data to run in a reasonable amount of time on a single machine. You can also try to run with n_jobs=1 to be in sequential mode and see whether the problem persists. - dask/distributed vs dask-jobqueue. Suggestion: try to run the same code with a
dask.distributed.LocalClusterto see whether the problem persists. I would guess it is something in dask/distributed rather than dask-jobqueue but I may be wrong about this.
Also it is very likely not directly related to https://github.com/joblib/joblib/issues/959, because you don't do nested parallelism. In other words, you have a single level of parallelism at the RandomSearchCV level and none at the estimator level. https://github.com/joblib/joblib/issues/959 has nested parallelism: GridSearchCV + ColumnTransformer. It could be the manifestation of the same underlying cause, again this is very hard to tell ...
Great to know that it works! It would be even greater if you could post the script that you are now using. This could be useful for other people who bump in a similar issue as yours!
Sure, it is the least I can do.
The Slurm script for the scheduler is
#!/bin/bash -l
#SBATCH --job-name=pyth_script # Job name
#SBATCH --output=jobname.%j.out # Stdout (%j expands to jobId)
#SBATCH --error=jobname.%j.err # Stderr (%j expands to jobId)
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=20
#SBATCH --partition=compute
#SBATCH --account=pr008
#SBATCH --time=01:00:00
module purge
source /users/pr008/user/.bashrc
conda activate alekos
srun dask-scheduler --scheduler-file /pr008/user/scheduler.json --interface ib0
and for the worker
#!/bin/bash -l
#SBATCH --job-name=pyth_script # Job name
#SBATCH --output=jobname.%j.out # Stdout (%j expands to jobId)
#SBATCH --error=jobname.%j.err # Stderr (%j expands to jobId)
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=20
#SBATCH --partition=compute
#SBATCH --account=pr008
#SBATCH --time=01:00:00
module purge
source /users/pr008/user/.bashrc
conda activate alekos
srun dask-worker --scheduler-file /pr008/user/scheduler.json
The only drawback this way is that I need to allocate 1 node for the scheduler.
I am 95% confident that if I was sitting next to you I could find a way to make it work in ~1h with SLURMCluster. This kind of problems is not super easy to solve on github unfortunately ...
Well I'm pretty sure you would indeed XD. I think the reason that I cannot make it work with SLURMCluster is because it cannot see the ip of the compute node from the login node. When I submit the above scripts the scheduler and the workers connect to an ip that is not shown by ifconfig. I tried to put that ip using --host parameter in SLURMCluster but it failed.
Try to remove with
joblib.parallel_backend('dask'):and see whether the problem persists.
Removing it, it uses the login CPUs and is running just fine.
You may need to run on a subset of your parameter grid or on a subset of your data to run in a reasonable amount of time on a single machine.
That did not help. Either way I only had 200 iterations in RandomSearchCV which is very small.
You can also try to run with n_jobs=1 to be in sequential mode and see whether the problem persists.
Setting n_jobs=1 do not set sequential mode and that is weird.
[Parallel(n_jobs=1)]: Using backend DaskDistributedBackend with 40 concurrent workers.
and the same errors as before.
If i remove joblib.parallel_backend('dask'): it uses sequential mode correctly.
try to run the same code with a
dask.distributed.LocalCluster
LocalCluster works. In general dask works in the login node.
OK thanks for the details. There seems to be some problems with the joblib dask backend and maybe there are additional issues with dask-jobqueue and also some of your cluster quirks, hard to tell ...
A work-around you could try is to use dask_ml.model_selection.RandomSearchCV, i.e. you would not be using joblib dask backend.
Out of interest do you launch your submission scripts in https://github.com/dask/dask-jobqueue/issues/376#issuecomment-572005474 with salloc rather than sbatch? I find it a bit unusual to have srun commands in submission scripts but I am not a SLURM expert at all.
A work-around you could try is to use dask_ml.model_selection.RandomSearchCV, i.e. you would not be using joblib dask backend.
I tried that and it worked but it is super slow. It almost took 5 minutes for 20 iterations. To compare, in the login node with less CPUs it took less 10 seconds.
Out of interest do you launch your submission scripts in #376 (comment) with salloc rather than sbatch? I find it a bit unusual to have srun commands in submission scripts but I am not a SLURM expert at all.
I also tried salloc but i receive an error. I follow the documentation of the HPC on how to submit the jobs.
I tried that and it worked but it is super slow. It almost took 5 minutes for 20 iterations. To compare, in the login node with less CPUs it took less 10 seconds.
Looking at the dashboard could give you some insights why this is so slow. See this for more details. Wild guess: maybe the dataframe is serialized and sent to the workers for each computation. IIRC you can use client.scatter to send the data to the workers and then each computation should be quicker.
I propose to close this issue as initial problem was solved, and there is no more activity.
Closing as stale.