dask-jobqueue icon indicating copy to clipboard operation
dask-jobqueue copied to clipboard

How to get the same MPI/OpenMP mapping using dask-jobqueue

Open ofmla opened this issue 5 years ago • 13 comments

Hi everybody,

I am using the following script to run a hybrid (MPI/OpenMP) application in a Cluster (each node in the cluster has 20 physical cores) via SLURM

#!/bin/bash
JOB=458754
for nrun in {1..10}
do
for npop in 600
do
  for niter in 100
  do
     for boundary in reflecting
     do
SAIDA=$(sbatch -o "${nrun}runs${npop}x${niter}_${boundary}_gbest_canonical_cc_em_375x369_5000_2_2.txt" --ntasks=300 --cpus-per-task=2 --ntasks-per-node=10 --ntasks-per-socket=5 -p cluster128g --requeue ./script.sh $nrun $npop $niter $boundary 2)
    JOB=$SAIDA
    done
  done
done
done
echo "last job was $JOB

Thus, I map 10 MPI process on each node of the cluster, each of which spawns two OpenMP threads (10 MPI x 2 OpenMP threads= 20cores). I rewrote my application in python and I would like to use dask to parallelize it. However, I couldn't map more than one worker in a node in my first attempts. How can I do that? I mean map more than one dask worker in the nodes such way I take advantage of all resources. I looked for some example trought the issues list but I could not find any. Some idea?

Thanks

ofmla avatar Mar 21 '19 19:03 ofmla

If I understand you correctly (and I'm not sure I do), you want to run an MPI/OpenMP application as a task? dask_jobqueue doesn't handle that case for you since it doesn't deal with MPI (tasks are restricted within a single node, there's no awareness of how to launch an MPI job).

We're working on a library that builds on top of jobqueue and targets that use case, you could try it out but it is still pretty alpha (and not so well documented): https://github.com/E-CAM/jobqueue_features

ocaisa avatar Mar 22 '19 10:03 ocaisa

Hi @ofmla, not sure I understand you either. What @ocaisa says is indeed correct.

But if you wand to run 10 worker processes with 2 cores each, you can do that with something like

cluster = SLURMCluster(processes=10, #10 processes means 10 individual workers
                                       cores=20,  #10 workers will share 20 cores: so 2 cores per workers
                                       memory='120GB', ...)

However, if you want to submit from python/Dask a task that needs two cores, then you should just use dask for task scheduling, and the kwarg can be different, for example:

cluster = SLURMCluster(cores=10, # Only ten tasks simultaneously
                                       memory='120GB', 
                                       job_cpu=20, # but request complete nodes
                                       ...)

guillaumeeb avatar Mar 22 '19 12:03 guillaumeeb

Thanks for your answer @ocaisa. I apologize for not explaining my problem properly. Let me explain you better. I have an optimizer (Particle swarm optimization) written in low-level language. The optimizer was implemented using a hybrid (MPI/OpenMP) master/worker programming paradigm where particles fitness evaluation is handled through dynamic scheduling. I rewrote the optimizer in python with the help of a framework named DEAP (https://github.com/DEAP/deap), which provides an easy way to evaluate individuals in a population on several cores in parallel. The user need merely provide an implementation of a map function. To parallelize over a large cluster machine I am using the DASK library. The cluster I am using has nodes with 20 physical cores but the fitness evaluation uses a few cores due to the size of the problem that I am working on. So, with my low-level implementation, I can map several MPI processes (each process performs the fitness evaluation of a particle) in one node. If I understand well an MPI process would be equivalent to a dask worker. I just want to take advantage of all resources (20 cores), but I am getting jobs using only 2 cores per node setting my cluster as follows:

    cluster = SLURMCluster(cores=2,
                     memory="10GB", 
                     queue="standard",
                     interface="ib0", job_extra=['--time=02:00:00 --requeue'])
    print("Waiting for workers")
    cluster.start_workers(5)
    c = Client(cluster)

what can I do in order to use full cores in each node? I hope that you can understand me now ;)

ofmla avatar Mar 22 '19 12:03 ofmla

There's a few very similar discussions going on in the dask-mpi repo. https://github.com/dask/dask-mpi/issues/25 might be particularly interesting for you.

willirath avatar Mar 22 '19 12:03 willirath

@ofmla, did you see m'y comment above? I think this is what you want.

guillaumeeb avatar Mar 29 '19 20:03 guillaumeeb

@guillaumeeb thanks for your readiness to support and assist and sorry for the late reply, I had a deadline lately which took my time. I am still confused about on dask-jobqueue operation and I am facing some issues. Anyway, I will perform some tests following your comment and let you know if it works.

ofmla avatar Apr 02 '19 13:04 ofmla

This is getting stale, any news on this @ofmla?

guillaumeeb avatar May 09 '19 20:05 guillaumeeb

So sorry, @guillaumeeb. I am so used to work with low-level programming languages that I find it hard to rewrite my apps with Python. Since I could not make my app work I ended up stopping for a time and I forgot to respond to you. I tried the first option as it fits in what I want

 cluster = SLURMCluster(processes=10, #10 processes means 10 individual workers
                         queue="standard",interface="ib0",cores=20,  #10 workers will share 20 cores: so 2 cores per workers
                       memory='120GB',job_extra=['--time=72:00:00 --requeue'])
 header_lines = cluster.job_header.split('\n')
 mem_pos = find_mem_pos(header_lines)
 header_lines = header_lines[:mem_pos] + header_lines[mem_pos+1:]
 cluster.job_header = '\n'.join(header_lines)
 print("Waiting for workers")
 cluster.start_workers(20)
 c = Client(cluster)
 print(c.scheduler_info())
 print(cluster.job_script())

In that case the squeue command returns the following

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
              45475  standard    dask-wor   oscarm  R   INVALID            1 c021

However the app suffers stagnation. This is the output from the job

slurm-45475.out.txt

Even running as before (jobs using only 2 cores per node, in spite of having 20) I can not complete one iteration of the optimization because some jobs not complete their work. I got messages like those

unresponsive.txt collections.txt

Both, CPU percentage and unresponsive time increase progressively over time. There is a memory problem and I do not know where it is coming from. I distributed 360 tasks on 25 workers, every worker has access to the true data (462M) and creates chunks of data ~9.23M (50 chunks), each of which is compared with the corresponding chunk of true data in a sequential way. I used a clear_cache function after each chunk is generated. If I increase considerably the memory in cluster definition I can complete one iteration and the messages with 'garbage collection released' disappear but not those about unresponsive loops. Because of that the performance is degraded. Ultimately, you can close this issue. I'll search for help in stack overflow I think.

ofmla avatar May 15 '19 20:05 ofmla

cluster.start_workers(20)

This should start 2 Slurm jobs according to your SlurmCluster configuration, I only see one in your squeue output. And you should use:

cluster.scale(20)

And yes it looks like a memory problem, which seems weird when looking at the amount of data you mention, you should complete one iteration easily. It's difficult to go further, as I've no access to your complete code. Do you get thos messages from Slurm workers output, or from your main app, so from the Scheduler?

guillaumeeb avatar May 16 '19 20:05 guillaumeeb

cluster.start_workers(20)

This should start 2 Slurm jobs according to your SlurmCluster configuration, I only see one in your squeue output. And you should use:

cluster.scale(20)

I tried with cluster.scale(20) and got the 2 jobs. Moreover, nothing changed.

And yes it looks like a memory problem, which seems weird when looking at the amount of data you mention, you should complete one iteration easily. It's difficult to go further, as I've no access to your complete code. Do you get thos messages from Slurm workers output, or from your main app, so from the Scheduler?

I'll back with a small python script that reproduces the problem.

ofmla avatar May 17 '19 22:05 ofmla

cluster.start_workers(20)

This should start 2 Slurm jobs according to your SlurmCluster configuration, I only see one in your squeue output. And you should use:

cluster.scale(20)

I tried with cluster.scale(20) and got the 2 jobs. Moreover, nothing changed.

Actually, the app starts after some time, but the slurm jobs are more time consuming than the normal. This period of time was excessive and I had to kill the app. Maybe, it is directly related to the memory problem.

And yes it looks like a memory problem, which seems weird when looking at the amount of data you mention, you should complete one iteration easily. It's difficult to go further, as I've no access to your complete code. Do you get those messages from Slurm workers output, or from your main app, so from the Scheduler?

I'll back with a small python script that reproduces the problem.

My app depends on two frameworks, DEAP (Distributed Evolutionary Algorithms in Python, https://github.com/DEAP/deap) and Devito (https://github.com/opesci/devito), python frameworks for evolutionary and automated finite difference computations, respectively. So I decided to let them away and to try to simulate what is done for each particle/individual, this is, to generate chunks of data and compared them with the true data to compute the cost function. Below is the data used and the small code that I used

https://sesibahia-my.sharepoint.com/:u:/g/personal/oscar_ladino_fieb_org_br/ETC6Ocq1yJJElN-jS2fExXAB1BpT5rMO_S9hRd5ncl3VWA?e=Y9LIeg

import numpy
import sys
from dask_jobqueue import SLURMCluster
from dask.distributed import Client, wait

def find_mem_pos(header_lines):
    for i,line in enumerate(header_lines):
        if('--mem=' in line):
            return i

def my_task(i,in_data):
    numpy.random.seed(i)
    error=0.
    print (len(in_data[0][0]))
    for j in  range(len(in_data[0][0])):
     local_data = numpy.random.random(size=(6559, 369))
     res=get_value(local_data,in_data[:,:,j])
     error += res
    return (numpy.array(error.data),)

def get_value(dcalc, dobs):
    return  0.5 * numpy.sum((dcalc - dobs)**2.) 

cluster = SLURMCluster(cores=2,
                     memory="10GB", 
                     queue="standard",
                     interface="ib0")
header_lines = cluster.job_header.split('\n')
mem_pos = find_mem_pos(header_lines)
header_lines = header_lines[:mem_pos] + header_lines[mem_pos+1:]
cluster.job_header = '\n'.join(header_lines)
print(cluster.job_script())
print("Waiting for workers")
cluster.scale(20)
client = Client(cluster)

# Load the data
shots = numpy.fromfile('shots.file', dtype=numpy.float32)
shots = numpy.reshape(shots, (6559, 369, 50))
[remote_data] = client.scatter([shots], broadcast=True)

features=client.map(my_task, range(360),in_data=remote_data)
for fit in features:
print(fit.result())

This code works properly and I don't have any issue with memory. However, when devito is added to generate the data through finite differences computations I face problems with memory, even using a clear_cache function as mentioned earlier. I will ask for help to devito developers and when the problem be solved I'll try again the cluster configuration above.

ofmla avatar May 21 '19 14:05 ofmla

@ofmla any news on this one?

This is probably due to the external framework not freeing up memory correctly...

guillaumeeb avatar Aug 18 '19 16:08 guillaumeeb

Sorry, @guillaumeeb. I was busy with other stuff and I shelve my project for a while as definitively there is a problem with the other framework I am using. When the maintainers figure out a fix to the problem I will try again the options you suggest to me. Thanks for your help.

ofmla avatar Aug 21 '19 18:08 ofmla

Closing this issue as stale. @ofmla if you ever work on this again, feel free to reopen.

guillaumeeb avatar Aug 30 '22 06:08 guillaumeeb