nextflow icon indicating copy to clipboard operation
nextflow copied to clipboard

Job array for more efficient SGE cluster management

Open mgandal opened this issue 4 years ago • 17 comments

I am using nextflow with a SGE task scheduler. I've noticed that the pipeline submits many very similar jobs that are batched across a number of samples (e.g., FastQC on hundreds of RNA-seq files with the nextflow/rnaseq pipeline). Some job schedulers cannot handle this many jobs being submitted at a time. I know there are ways to reduce the speed and frequency of job submission. However, it would be much more efficient to be able to use job arrays for these highly repetitive tasks. E.g., qsub -t 1:400 this_task

mgandal avatar Jan 29 '20 19:01 mgandal

I'm also interested in support of array jobs for Slurm and AWS Batch. I' wondering if the foundation for all executors is within one class or if this has to be implemented per executor.

https://github.com/nextflow-io/nextflow/issues/1427

tbugfinder avatar Jan 29 '20 21:01 tbugfinder

Maybe I should open a new issue for Slurm: https://slurm.schedmd.com/job_array.html

tbugfinder avatar Jan 29 '20 21:01 tbugfinder

I think you can make this work currently using clusterOptions and the task id environment variable for the scheduler. Toy example below for slurm.

process foo{                                                                                                                                                                                          
   time '5m'                                                                                                                                                                                          
   executor 'slurm'                                                                                                                                                                                   
   clusterOptions '--array=1-10'                                                                                                                                                    
                                                                                                                                                                                                     
   '''                                                                                                                                                                                                
   echo ${SLURM_ARRAY_TASK_ID} > new_${SLURM_ARRAY_TASK_ID}.txt                                                                                                                                       
   '''                                                                                                                                                                                                
}  

KevinSayers avatar Jan 31 '20 19:01 KevinSayers

Job array does not match NF design by which each task should have its own work dir. No plan in the short/medium term.

pditommaso avatar Mar 26 '20 13:03 pditommaso

I think you can make this work currently using clusterOptions and the task id environment variable for the scheduler. Toy example below for slurm.

process foo{                                                                                                                                                                                          
   time '5m'                                                                                                                                                                                          
   executor 'slurm'                                                                                                                                                                                   
   clusterOptions '--array=1-10'                                                                                                                                                    
                                                                                                                                                                                                     
   '''                                                                                                                                                                                                
   echo ${SLURM_ARRAY_TASK_ID} > new_${SLURM_ARRAY_TASK_ID}.txt                                                                                                                                       
   '''                                                                                                                                                                                                
}  

I tried this and i think the task finishes as soon as the first array task finishes. The other tasks continue to run, but the results will not be in the output.

idot avatar Nov 29 '20 14:11 idot

I just wanted to add a vote for this feature and add some more rationale.

Nextflow is a great workflow language, but it's hard to justify its claim of scalability if it cannot handle more than ~500 concurrent jobs, as demonstrated here: https://www.biorxiv.org/content/10.1101/2021.04.03.437906v1.full.pdf

I've reproduced the same result with LSF, and it's not very surprising given that job schedulers are not meant to function at scale for individual jobs. With array jobs on LSF, we can efficiently submit many thousands of jobs, which is the level of parallelism required for effectively working with data that's hundreds of terabytes in size.

That being said, I can definitely see how it's not trivial to square with the current task design.

For anyone else looking for a stop-gap solution, there's this awesome hack on the Gitter: https://gitter.im/nextflow-io/nextflow?at=5f736648f41f4105e4e5a0b4

krokicki avatar Apr 11 '21 01:04 krokicki

I don't think that benchmark is accurate and looking at the implementation, I'm quite convinced that the problem is caused by wrong use of the maxForks directive

https://github.com/pditommaso/scalability-tst/commit/41ddbf2224832c713aee5b8920c5fcb4059c2d3a

NF can scale well beyond the 500 parallel tasks. I'm not even convinced that array jobs are much more efficient than normal jobs, the scheduler should in any case track their status independently.

pditommaso avatar Apr 11 '21 15:04 pditommaso

Thanks for debunking that. I double checked my own tests and it seems that I was running up against executor.queueSize, which has a default of 100. Increasing this helped a ton.

But still, as far as I can tell, independent jobs don't come close to array jobs in job submission performance.

Here's a benchmark I just ran on our mostly-empty LSF cluster:

#!/bin/bash
echo "Running job#$LSB_JOBINDEX on "`hostname`
sleep 5
echo "Done"

bsub -J "scale_test[1-1000]" -o $PWD/output/output.%I $PWD/scale.sh

The job submission is instantaneous, and the time from invocation to the final job finishing was 1m1s. I ran the test two more times and got 55s and 53s.

I tried this with Nextflow:

counter_channel = Channel.of(1..1000)
process test {
    input:
    val counter from counter_channel
    """
    echo "Running job#$counter on "`hostname`
    sleep 5
    echo "Done"
    """
}

My nextflow.config:

executor {
    queueSize = 1000
    pollInterval = "2 sec"
}

Nextflow reports 3m2s for the workflow. When I reran I got 3m4s and 3m30s.

This is mainly due to overhead for individually submitting the jobs. It's about 2.5 minutes before the job counter ticks up to 1000. I see basically the same behavior with a bash loop: each bsub has some ~130ms latency and this just adds up.

krokicki avatar Apr 13 '21 03:04 krokicki

NF is designed for large grain tasks parallelisation, not for short live task executions. Therefore this magnitude of overhead is somehow expected since it would have a minimal impact on the overall pipeline duration.

pditommaso avatar Apr 13 '21 04:04 pditommaso

Makes sense. I just wanted to shed some more light on the use case, in case there is some redesign or extension in the future that could address it. Thanks for considering!

krokicki avatar Apr 13 '21 16:04 krokicki

Thanks for your feedback!

pditommaso avatar Apr 13 '21 17:04 pditommaso

I don't think that benchmark is accurate and looking at the implementation, I'm quite convinced that the problem is caused by wrong use of the maxForks directive

pditommaso/scalability-tst@41ddbf2

NF can scale well beyond the 500 parallel tasks. I'm not even convinced that array jobs are much more efficient than normal jobs, the scheduler should in any case track their status independently.

Just some information about the effect of submitting a large number of individual jobs with Slurm. Each job has to be entered into the database as soon as it is submitted and each job, whether pending or running, is listed individually in the list of jobs produced by squeue. Thus, the submission of a large number of jobs within a very short time can make the database become unresponsive and will make the list of pending jobs become very long.

On the other hand, when the jobs are submitted via a job array, all pending jobs are represented as a single database entry and only get an individual entry once running. In the job list, all pending jobs belonging to a given array can be listed as a single line.

Thus, from the point of view of the system admin, it is much more desirable if a large number of jobs is submitted as a job array. In particular, the creation of job entries in the database is actually more efficient for a job array (assuming there are not enough resources for all the jobs to start immediately).

tardigradus avatar Apr 19 '22 09:04 tardigradus

Let's re-open this

pditommaso avatar Apr 19 '22 09:04 pditommaso

A challenging aspect of implementing the support for job array is that Nextflow is modelled around the idea that a job a function with no side effects.

In practical terms, this means each job needs to be assigned its own work directory so that outputs do not overlap each other.

How is the working directory managed for jobs array with SGE or Slurm? I guess child jobs in the array they all have allocated with the same work directory.

pditommaso avatar Apr 19 '22 09:04 pditommaso

Each job array has a single working directory, but for each element of a Slurm job array, the environment variable SLURM_ARRAY_TASK_ID contains the value of the index. This can then be used to construct unique (sub-)directory names.

tardigradus avatar Apr 19 '22 10:04 tardigradus

Looking at this again, I wonder whether it is not already possible to use Slurm job arrays. If I write clusterOptions = '--array=0-999', can't I make use of SLURM_ARRAY_TASK_ID in my Slurm batch script and start each job in the array with, say, its own input and output directory?

Essentially this would just entail moving the lowest level of pipeline granularity (say, a loop over the values of a parameter) out of nextflow into Slurm.

WDYT?

tardigradus avatar May 20 '22 08:05 tardigradus

This could work provided, but I guess the aim of this issue is to provide more native integration with jobs array.

Somehow related to this there's an interesting proposal to add the support for HyperQueue to Nextflow, which looks like to provide a solution to the handling of high jobs throughput that better fits the nextflow model.

https://github.com/nextflow-io/nextflow/issues/2867

pditommaso avatar May 21 '22 14:05 pditommaso

This could work provided, but I guess the aim of this issue is to provide more native integration with jobs array.

Somehow related to this there's an interesting proposal to add the support for HyperQueue to Nextflow, which looks like to provide a solution to the handling of high jobs throughput that better fits the nextflow model.

#2867

According to this information while HyperQueue does support job arrays, it does not make use of the job array mechanism provided by Slurm. Therefore it does not (currently) address the issue raised here.

tardigradus avatar Aug 11 '22 08:08 tardigradus

One of the main benefits of job arrays is that they reduce the size of the scheduling processing queue. Slurm - and no doubt tother schedulers - has specific performance improvements implemented regarding job arrays. Also when looking for jobs that can potentially backfill, it constiders a certain number of jobs at the top of the scheduling queue. A job array fills one slot in this list so it can potentially get a chance to consider far more different types of job for backfilling when using job arrays. This is needed to fill as much of the potential of the cluster with useful work as possible. Without this you end up with a lot of scheduling gaps wasting the potential of a cluster, money spent on buying it and electricity cost in running it.

emyr666 avatar Nov 11 '22 13:11 emyr666

I hadn't been aware of the potential problem with backfilling. That is also something we would take very seriously as Nextflow allows relatively inexperienced users, who might overestimate their job requirements, to generate a large number of jobs without using a job array. Without Nextflow, the users will have to code some other mechanism for submitting jobs, so these users will tend to be rather more advanced. Although that is still also no guarantee that they will get their job requirements right, it is usually easier to explain the problem to them.

tardigradus avatar Nov 11 '22 13:11 tardigradus

something advanced like:

for x in {1..1000000}; do 
  qsub my-job.sh
done

pditommaso avatar Nov 11 '22 16:11 pditommaso

Can the LSF label be added ? LSF also understands (and prefers) job arrays

muffato avatar Nov 11 '22 16:11 muffato

something advanced like:

for x in {1..1000000}; do 
  qsub my-job.sh
done

Yes, exactly :-) We have many users who just inherit a job script from somewhere and have absolutely no concept that you can program in the shell nor that what goes on in the job script are commands that you could run directly on the command line.

tardigradus avatar Jan 04 '23 09:01 tardigradus

I just wanted to add that job array support (LSF in my case) is a key need for several of my use cases.

trautmane avatar Mar 22 '23 15:03 trautmane

Hi Paolo,

I think at the scale we are using NF at 23andme, we would really benefit from this. We are currently being bottlenecked by UGE as it takes like 5-10 mins to submit 1,000 jobs (while as an array job it should take a second). As a result often we are resulting in jobs submitting faster than UGE can queue them.

santy-ttam avatar Apr 07 '23 05:04 santy-ttam

I'm a sysadmin of an HPC cluster. We ran SGE/UGE in the past and are currently using SLURM. I can confirm that it's far more efficient for scheduling systems to use array jobs than for looping qsub or sbatch. For this reason, to encourage our community to use array jobs, we even set default per-user job/submission limits, so we don't get spammed by non-array jobs, e.g. via MaxSubmitJobsPerUser in SLURM. This can obviously conflict with the way nextflow currently works and is another reason for first-class support of array jobs in nextflow.

wookietreiber avatar Apr 18 '23 10:04 wookietreiber

For this reason, to encourage our community to use array jobs, we even set default per-user job/submission limits, so we don't get spammed by non-array jobs, e.g. via MaxSubmitJobsPerUser in SLURM.

Does this actually work? My understanding of https://bugs.schedmd.com/show_bug.cgi?id=3409 is that each array task is treated as a single job w.r.t. limits such as MaxSubmitJobsPerUser. Is this not the case?

tardigradus avatar Apr 18 '23 11:04 tardigradus

For this reason, to encourage our community to use array jobs, we even set default per-user job/submission limits, so we don't get spammed by non-array jobs, e.g. via MaxSubmitJobsPerUser in SLURM.

Does this actually work? My understanding of https://bugs.schedmd.com/show_bug.cgi?id=3409 is that each array task is treated as a single job w.r.t. limits such as MaxSubmitJobsPerUser. Is this not the case?

@tardigradus You're right, I wasn't aware of the details, MaxSubmitJobsPerUser does count the array job tasks, I was thinking of our old grid engine configuration. I asked my colleague @tazend who implemented this, he said we're currently enforcing max 1000 jobs via job submission verifier plugin which doesn't count the array job tasks.

wookietreiber avatar Apr 18 '23 12:04 wookietreiber

We're thinking about implementing job arrays in the form of a new executor (e.g. array or batch executor) which, given a "target" executor and batch size, submits tasks to the target executor as array jobs. So you spend a little more time waiting for enough tasks to be created before submitting them, but then you hopefully save more time by submitting them as array jobs.

It looks like the executors people have requested so far are:

  • SLURM
  • SGE
  • LSF
  • AWS Batch

Also I know from experience that PBS supports job arrays.

The main thing I'm wondering is, how do you specify resources for array jobs? If people could chime in for their executor, that would be great. For example, if I was going to submit an array job with 1000 tasks and 1 cpu + 2 GB mem + 1 hr walltime per task:

  1. What would the submit options look like?
  2. Would the walltime constraint apply to the entire array job or each task independently (or maybe that doesn't matter if they all start at the same time)?

bentsherman avatar Apr 19 '23 22:04 bentsherman

I'm glad to see this going forward.

The main thing I'm wondering is, how do you specify resources for array jobs? If people could chime in for their executor, that would be great. For example, if I was going to submit an array job with 1000 tasks and 1 cpu + 2 GB mem + 1 hr walltime per task:

1. What would the submit options look like?
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=1
#SBATCH --mem-per-cpu=2G
#SBATCH --time=01:00:00
#SBATCH --array=0-999
2. Would the walltime constraint apply to the entire array job or each task independently (or maybe that doesn't matter if they all start at the same time)?

All parameters except --array apply to each element of the array, so specifically, the walltime constraint of one hour applies to each element. In general, the elements of an array will not start at the same time.

tardigradus avatar Apr 20 '23 06:04 tardigradus