dask-jobqueue
dask-jobqueue copied to clipboard
How to get the same MPI/OpenMP mapping using dask-jobqueue
Hi everybody,
I am using the following script to run a hybrid (MPI/OpenMP) application in a Cluster (each node in the cluster has 20 physical cores) via SLURM
#!/bin/bash
JOB=458754
for nrun in {1..10}
do
for npop in 600
do
for niter in 100
do
for boundary in reflecting
do
SAIDA=$(sbatch -o "${nrun}runs${npop}x${niter}_${boundary}_gbest_canonical_cc_em_375x369_5000_2_2.txt" --ntasks=300 --cpus-per-task=2 --ntasks-per-node=10 --ntasks-per-socket=5 -p cluster128g --requeue ./script.sh $nrun $npop $niter $boundary 2)
JOB=$SAIDA
done
done
done
done
echo "last job was $JOB
Thus, I map 10 MPI process on each node of the cluster, each of which spawns two OpenMP threads (10 MPI x 2 OpenMP threads= 20cores). I rewrote my application in python and I would like to use dask to parallelize it. However, I couldn't map more than one worker in a node in my first attempts. How can I do that? I mean map more than one dask worker in the nodes such way I take advantage of all resources. I looked for some example trought the issues list but I could not find any. Some idea?
Thanks
If I understand you correctly (and I'm not sure I do), you want to run an MPI/OpenMP application as a task? dask_jobqueue
doesn't handle that case for you since it doesn't deal with MPI (tasks are restricted within a single node, there's no awareness of how to launch an MPI job).
We're working on a library that builds on top of jobqueue and targets that use case, you could try it out but it is still pretty alpha (and not so well documented): https://github.com/E-CAM/jobqueue_features
Hi @ofmla, not sure I understand you either. What @ocaisa says is indeed correct.
But if you wand to run 10 worker processes with 2 cores each, you can do that with something like
cluster = SLURMCluster(processes=10, #10 processes means 10 individual workers
cores=20, #10 workers will share 20 cores: so 2 cores per workers
memory='120GB', ...)
However, if you want to submit from python/Dask a task that needs two cores, then you should just use dask for task scheduling, and the kwarg can be different, for example:
cluster = SLURMCluster(cores=10, # Only ten tasks simultaneously
memory='120GB',
job_cpu=20, # but request complete nodes
...)
Thanks for your answer @ocaisa. I apologize for not explaining my problem properly. Let me explain you better. I have an optimizer (Particle swarm optimization) written in low-level language. The optimizer was implemented using a hybrid (MPI/OpenMP) master/worker programming paradigm where particles fitness evaluation is handled through dynamic scheduling. I rewrote the optimizer in python with the help of a framework named DEAP (https://github.com/DEAP/deap), which provides an easy way to evaluate individuals in a population on several cores in parallel. The user need merely provide an implementation of a map function. To parallelize over a large cluster machine I am using the DASK library. The cluster I am using has nodes with 20 physical cores but the fitness evaluation uses a few cores due to the size of the problem that I am working on. So, with my low-level implementation, I can map several MPI processes (each process performs the fitness evaluation of a particle) in one node. If I understand well an MPI process would be equivalent to a dask worker. I just want to take advantage of all resources (20 cores), but I am getting jobs using only 2 cores per node setting my cluster as follows:
cluster = SLURMCluster(cores=2,
memory="10GB",
queue="standard",
interface="ib0", job_extra=['--time=02:00:00 --requeue'])
print("Waiting for workers")
cluster.start_workers(5)
c = Client(cluster)
what can I do in order to use full cores in each node? I hope that you can understand me now ;)
There's a few very similar discussions going on in the dask-mpi
repo. https://github.com/dask/dask-mpi/issues/25 might be particularly interesting for you.
@ofmla, did you see m'y comment above? I think this is what you want.
@guillaumeeb thanks for your readiness to support and assist and sorry for the late reply, I had a deadline lately which took my time. I am still confused about on dask-jobqueue operation and I am facing some issues. Anyway, I will perform some tests following your comment and let you know if it works.
This is getting stale, any news on this @ofmla?
So sorry, @guillaumeeb. I am so used to work with low-level programming languages that I find it hard to rewrite my apps with Python. Since I could not make my app work I ended up stopping for a time and I forgot to respond to you. I tried the first option as it fits in what I want
cluster = SLURMCluster(processes=10, #10 processes means 10 individual workers
queue="standard",interface="ib0",cores=20, #10 workers will share 20 cores: so 2 cores per workers
memory='120GB',job_extra=['--time=72:00:00 --requeue'])
header_lines = cluster.job_header.split('\n')
mem_pos = find_mem_pos(header_lines)
header_lines = header_lines[:mem_pos] + header_lines[mem_pos+1:]
cluster.job_header = '\n'.join(header_lines)
print("Waiting for workers")
cluster.start_workers(20)
c = Client(cluster)
print(c.scheduler_info())
print(cluster.job_script())
In that case the squeue
command returns the following
JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON)
45475 standard dask-wor oscarm R INVALID 1 c021
However the app suffers stagnation. This is the output from the job
Even running as before (jobs using only 2 cores per node, in spite of having 20) I can not complete one iteration of the optimization because some jobs not complete their work. I got messages like those
unresponsive.txt collections.txt
Both, CPU percentage and unresponsive time increase progressively over time. There is a memory problem and I do not know where it is coming from. I distributed 360 tasks on 25 workers, every worker has access to the true data (462M) and creates chunks of data ~9.23M (50 chunks), each of which is compared with the corresponding chunk of true data in a sequential way. I used a clear_cache function after each chunk is generated. If I increase considerably the memory
in cluster definition I can complete one iteration and the messages with 'garbage collection released' disappear but not those about unresponsive loops. Because of that the performance is degraded. Ultimately, you can close this issue. I'll search for help in stack overflow I think.
cluster.start_workers(20)
This should start 2 Slurm jobs according to your SlurmCluster configuration, I only see one in your squeue output. And you should use:
cluster.scale(20)
And yes it looks like a memory problem, which seems weird when looking at the amount of data you mention, you should complete one iteration easily. It's difficult to go further, as I've no access to your complete code. Do you get thos messages from Slurm workers output, or from your main app, so from the Scheduler?
cluster.start_workers(20)
This should start 2 Slurm jobs according to your SlurmCluster configuration, I only see one in your squeue output. And you should use:
cluster.scale(20)
I tried with cluster.scale(20)
and got the 2 jobs. Moreover, nothing changed.
And yes it looks like a memory problem, which seems weird when looking at the amount of data you mention, you should complete one iteration easily. It's difficult to go further, as I've no access to your complete code. Do you get thos messages from Slurm workers output, or from your main app, so from the Scheduler?
I'll back with a small python script that reproduces the problem.
cluster.start_workers(20)
This should start 2 Slurm jobs according to your SlurmCluster configuration, I only see one in your squeue output. And you should use:
cluster.scale(20)
I tried with
cluster.scale(20)
and got the 2 jobs. Moreover, nothing changed.
Actually, the app starts after some time, but the slurm jobs are more time consuming than the normal. This period of time was excessive and I had to kill the app. Maybe, it is directly related to the memory problem.
And yes it looks like a memory problem, which seems weird when looking at the amount of data you mention, you should complete one iteration easily. It's difficult to go further, as I've no access to your complete code. Do you get those messages from Slurm workers output, or from your main app, so from the Scheduler?
I'll back with a small python script that reproduces the problem.
My app depends on two frameworks, DEAP (Distributed Evolutionary Algorithms in Python, https://github.com/DEAP/deap) and Devito (https://github.com/opesci/devito), python frameworks for evolutionary and automated finite difference computations, respectively. So I decided to let them away and to try to simulate what is done for each particle/individual, this is, to generate chunks of data and compared them with the true data to compute the cost function. Below is the data used and the small code that I used
https://sesibahia-my.sharepoint.com/:u:/g/personal/oscar_ladino_fieb_org_br/ETC6Ocq1yJJElN-jS2fExXAB1BpT5rMO_S9hRd5ncl3VWA?e=Y9LIeg
import numpy
import sys
from dask_jobqueue import SLURMCluster
from dask.distributed import Client, wait
def find_mem_pos(header_lines):
for i,line in enumerate(header_lines):
if('--mem=' in line):
return i
def my_task(i,in_data):
numpy.random.seed(i)
error=0.
print (len(in_data[0][0]))
for j in range(len(in_data[0][0])):
local_data = numpy.random.random(size=(6559, 369))
res=get_value(local_data,in_data[:,:,j])
error += res
return (numpy.array(error.data),)
def get_value(dcalc, dobs):
return 0.5 * numpy.sum((dcalc - dobs)**2.)
cluster = SLURMCluster(cores=2,
memory="10GB",
queue="standard",
interface="ib0")
header_lines = cluster.job_header.split('\n')
mem_pos = find_mem_pos(header_lines)
header_lines = header_lines[:mem_pos] + header_lines[mem_pos+1:]
cluster.job_header = '\n'.join(header_lines)
print(cluster.job_script())
print("Waiting for workers")
cluster.scale(20)
client = Client(cluster)
# Load the data
shots = numpy.fromfile('shots.file', dtype=numpy.float32)
shots = numpy.reshape(shots, (6559, 369, 50))
[remote_data] = client.scatter([shots], broadcast=True)
features=client.map(my_task, range(360),in_data=remote_data)
for fit in features:
print(fit.result())
This code works properly and I don't have any issue with memory. However, when devito is added to generate the data through finite differences computations I face problems with memory, even using a clear_cache function as mentioned earlier. I will ask for help to devito developers and when the problem be solved I'll try again the cluster configuration above.
@ofmla any news on this one?
This is probably due to the external framework not freeing up memory correctly...
Sorry, @guillaumeeb. I was busy with other stuff and I shelve my project for a while as definitively there is a problem with the other framework I am using. When the maintainers figure out a fix to the problem I will try again the options you suggest to me. Thanks for your help.
Closing this issue as stale. @ofmla if you ever work on this again, feel free to reopen.