nextflow
nextflow copied to clipboard
Job array for more efficient SGE cluster management
I am using nextflow with a SGE task scheduler. I've noticed that the pipeline submits many very similar jobs that are batched across a number of samples (e.g., FastQC on hundreds of RNA-seq files with the nextflow/rnaseq pipeline). Some job schedulers cannot handle this many jobs being submitted at a time. I know there are ways to reduce the speed and frequency of job submission. However, it would be much more efficient to be able to use job arrays for these highly repetitive tasks.
E.g., qsub -t 1:400 this_task
I'm also interested in support of array jobs for Slurm and AWS Batch. I' wondering if the foundation for all executors is within one class or if this has to be implemented per executor.
https://github.com/nextflow-io/nextflow/issues/1427
Maybe I should open a new issue for Slurm: https://slurm.schedmd.com/job_array.html
I think you can make this work currently using clusterOptions and the task id environment variable for the scheduler. Toy example below for slurm.
process foo{
time '5m'
executor 'slurm'
clusterOptions '--array=1-10'
'''
echo ${SLURM_ARRAY_TASK_ID} > new_${SLURM_ARRAY_TASK_ID}.txt
'''
}
Job array does not match NF design by which each task should have its own work dir. No plan in the short/medium term.
I think you can make this work currently using clusterOptions and the task id environment variable for the scheduler. Toy example below for slurm.
process foo{ time '5m' executor 'slurm' clusterOptions '--array=1-10' ''' echo ${SLURM_ARRAY_TASK_ID} > new_${SLURM_ARRAY_TASK_ID}.txt ''' }
I tried this and i think the task finishes as soon as the first array task finishes. The other tasks continue to run, but the results will not be in the output.
I just wanted to add a vote for this feature and add some more rationale.
Nextflow is a great workflow language, but it's hard to justify its claim of scalability if it cannot handle more than ~500 concurrent jobs, as demonstrated here: https://www.biorxiv.org/content/10.1101/2021.04.03.437906v1.full.pdf
I've reproduced the same result with LSF, and it's not very surprising given that job schedulers are not meant to function at scale for individual jobs. With array jobs on LSF, we can efficiently submit many thousands of jobs, which is the level of parallelism required for effectively working with data that's hundreds of terabytes in size.
That being said, I can definitely see how it's not trivial to square with the current task design.
For anyone else looking for a stop-gap solution, there's this awesome hack on the Gitter: https://gitter.im/nextflow-io/nextflow?at=5f736648f41f4105e4e5a0b4
I don't think that benchmark is accurate and looking at the implementation, I'm quite convinced that the problem is caused by wrong use of the maxForks
directive
https://github.com/pditommaso/scalability-tst/commit/41ddbf2224832c713aee5b8920c5fcb4059c2d3a
NF can scale well beyond the 500 parallel tasks. I'm not even convinced that array jobs are much more efficient than normal jobs, the scheduler should in any case track their status independently.
Thanks for debunking that. I double checked my own tests and it seems that I was running up against executor.queueSize, which has a default of 100. Increasing this helped a ton.
But still, as far as I can tell, independent jobs don't come close to array jobs in job submission performance.
Here's a benchmark I just ran on our mostly-empty LSF cluster:
#!/bin/bash
echo "Running job#$LSB_JOBINDEX on "`hostname`
sleep 5
echo "Done"
bsub -J "scale_test[1-1000]" -o $PWD/output/output.%I $PWD/scale.sh
The job submission is instantaneous, and the time from invocation to the final job finishing was 1m1s. I ran the test two more times and got 55s and 53s.
I tried this with Nextflow:
counter_channel = Channel.of(1..1000)
process test {
input:
val counter from counter_channel
"""
echo "Running job#$counter on "`hostname`
sleep 5
echo "Done"
"""
}
My nextflow.config:
executor {
queueSize = 1000
pollInterval = "2 sec"
}
Nextflow reports 3m2s for the workflow. When I reran I got 3m4s and 3m30s.
This is mainly due to overhead for individually submitting the jobs. It's about 2.5 minutes before the job counter ticks up to 1000. I see basically the same behavior with a bash loop: each bsub has some ~130ms latency and this just adds up.
NF is designed for large grain tasks parallelisation, not for short live task executions. Therefore this magnitude of overhead is somehow expected since it would have a minimal impact on the overall pipeline duration.
Makes sense. I just wanted to shed some more light on the use case, in case there is some redesign or extension in the future that could address it. Thanks for considering!
Thanks for your feedback!
I don't think that benchmark is accurate and looking at the implementation, I'm quite convinced that the problem is caused by wrong use of the
maxForks
directivepditommaso/scalability-tst@41ddbf2
NF can scale well beyond the 500 parallel tasks. I'm not even convinced that array jobs are much more efficient than normal jobs, the scheduler should in any case track their status independently.
Just some information about the effect of submitting a large number of individual jobs with Slurm. Each job has to be entered into the database as soon as it is submitted and each job, whether pending or running, is listed individually in the list of jobs produced by squeue
. Thus, the submission of a large number of jobs within a very short time can make the database become unresponsive and will make the list of pending jobs become very long.
On the other hand, when the jobs are submitted via a job array, all pending jobs are represented as a single database entry and only get an individual entry once running. In the job list, all pending jobs belonging to a given array can be listed as a single line.
Thus, from the point of view of the system admin, it is much more desirable if a large number of jobs is submitted as a job array. In particular, the creation of job entries in the database is actually more efficient for a job array (assuming there are not enough resources for all the jobs to start immediately).
Let's re-open this
A challenging aspect of implementing the support for job array is that Nextflow is modelled around the idea that a job a function with no side effects.
In practical terms, this means each job needs to be assigned its own work directory so that outputs do not overlap each other.
How is the working directory managed for jobs array with SGE or Slurm? I guess child jobs in the array they all have allocated with the same work directory.
Each job array has a single working directory, but for each element of a Slurm job array, the environment variable SLURM_ARRAY_TASK_ID
contains the value of the index. This can then be used to construct unique (sub-)directory names.
Looking at this again, I wonder whether it is not already possible to use Slurm job arrays. If I write clusterOptions = '--array=0-999'
, can't I make use of SLURM_ARRAY_TASK_ID
in my Slurm batch script and start each job in the array with, say, its own input and output directory?
Essentially this would just entail moving the lowest level of pipeline granularity (say, a loop over the values of a parameter) out of nextflow into Slurm.
WDYT?
This could work provided, but I guess the aim of this issue is to provide more native integration with jobs array.
Somehow related to this there's an interesting proposal to add the support for HyperQueue to Nextflow, which looks like to provide a solution to the handling of high jobs throughput that better fits the nextflow model.
https://github.com/nextflow-io/nextflow/issues/2867
This could work provided, but I guess the aim of this issue is to provide more native integration with jobs array.
Somehow related to this there's an interesting proposal to add the support for HyperQueue to Nextflow, which looks like to provide a solution to the handling of high jobs throughput that better fits the nextflow model.
#2867
According to this information while HyperQueue does support job arrays, it does not make use of the job array mechanism provided by Slurm. Therefore it does not (currently) address the issue raised here.
One of the main benefits of job arrays is that they reduce the size of the scheduling processing queue. Slurm - and no doubt tother schedulers - has specific performance improvements implemented regarding job arrays. Also when looking for jobs that can potentially backfill, it constiders a certain number of jobs at the top of the scheduling queue. A job array fills one slot in this list so it can potentially get a chance to consider far more different types of job for backfilling when using job arrays. This is needed to fill as much of the potential of the cluster with useful work as possible. Without this you end up with a lot of scheduling gaps wasting the potential of a cluster, money spent on buying it and electricity cost in running it.
I hadn't been aware of the potential problem with backfilling. That is also something we would take very seriously as Nextflow allows relatively inexperienced users, who might overestimate their job requirements, to generate a large number of jobs without using a job array. Without Nextflow, the users will have to code some other mechanism for submitting jobs, so these users will tend to be rather more advanced. Although that is still also no guarantee that they will get their job requirements right, it is usually easier to explain the problem to them.
something advanced like:
for x in {1..1000000}; do
qsub my-job.sh
done
Can the LSF label be added ? LSF also understands (and prefers) job arrays
something advanced like:
for x in {1..1000000}; do qsub my-job.sh done
Yes, exactly :-) We have many users who just inherit a job script from somewhere and have absolutely no concept that you can program in the shell nor that what goes on in the job script are commands that you could run directly on the command line.
I just wanted to add that job array support (LSF in my case) is a key need for several of my use cases.
Hi Paolo,
I think at the scale we are using NF at 23andme, we would really benefit from this. We are currently being bottlenecked by UGE as it takes like 5-10 mins to submit 1,000 jobs (while as an array job it should take a second). As a result often we are resulting in jobs submitting faster than UGE can queue them.
I'm a sysadmin of an HPC cluster. We ran SGE/UGE in the past and are currently using SLURM. I can confirm that it's far more efficient for scheduling systems to use array jobs than for
looping qsub
or sbatch
. For this reason, to encourage our community to use array jobs, we even set default per-user job/submission limits, so we don't get spammed by non-array jobs, e.g. via MaxSubmitJobsPerUser
in SLURM. This can obviously conflict with the way nextflow currently works and is another reason for first-class support of array jobs in nextflow.
For this reason, to encourage our community to use array jobs, we even set default per-user job/submission limits, so we don't get spammed by non-array jobs, e.g. via
MaxSubmitJobsPerUser
in SLURM.
Does this actually work? My understanding of https://bugs.schedmd.com/show_bug.cgi?id=3409 is that each array task is treated as a single job w.r.t. limits such as MaxSubmitJobsPerUser
. Is this not the case?
For this reason, to encourage our community to use array jobs, we even set default per-user job/submission limits, so we don't get spammed by non-array jobs, e.g. via
MaxSubmitJobsPerUser
in SLURM.Does this actually work? My understanding of https://bugs.schedmd.com/show_bug.cgi?id=3409 is that each array task is treated as a single job w.r.t. limits such as
MaxSubmitJobsPerUser
. Is this not the case?
@tardigradus You're right, I wasn't aware of the details, MaxSubmitJobsPerUser
does count the array job tasks, I was thinking of our old grid engine configuration. I asked my colleague @tazend who implemented this, he said we're currently enforcing max 1000 jobs via job submission verifier plugin which doesn't count the array job tasks.
We're thinking about implementing job arrays in the form of a new executor (e.g. array
or batch
executor) which, given a "target" executor and batch size, submits tasks to the target executor as array jobs. So you spend a little more time waiting for enough tasks to be created before submitting them, but then you hopefully save more time by submitting them as array jobs.
It looks like the executors people have requested so far are:
- SLURM
- SGE
- LSF
- AWS Batch
Also I know from experience that PBS supports job arrays.
The main thing I'm wondering is, how do you specify resources for array jobs? If people could chime in for their executor, that would be great. For example, if I was going to submit an array job with 1000 tasks and 1 cpu + 2 GB mem + 1 hr walltime per task:
- What would the submit options look like?
- Would the walltime constraint apply to the entire array job or each task independently (or maybe that doesn't matter if they all start at the same time)?
I'm glad to see this going forward.
The main thing I'm wondering is, how do you specify resources for array jobs? If people could chime in for their executor, that would be great. For example, if I was going to submit an array job with 1000 tasks and 1 cpu + 2 GB mem + 1 hr walltime per task:
1. What would the submit options look like?
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=1
#SBATCH --mem-per-cpu=2G
#SBATCH --time=01:00:00
#SBATCH --array=0-999
2. Would the walltime constraint apply to the entire array job or each task independently (or maybe that doesn't matter if they all start at the same time)?
All parameters except --array
apply to each element of the array, so specifically, the walltime constraint of one hour applies to each element. In general, the elements of an array will not start at the same time.