nextflow icon indicating copy to clipboard operation
nextflow copied to clipboard

Enhance TaskArrayCollector to support configurable timeout-based array submission

Open johnoooh opened this issue 3 months ago • 4 comments

Currently, TaskArrayCollector submits a job array only when the array reaches its configured size. In pipelines with slower task generation or variable workloads, small batches may remain in memory indefinitely if the array never fills, delaying execution. This is especially a problem in pipelines with hanging channels, like channels made by watchPath. We have a few pipelines that run forever, watching a samplesheet for new entries, and processing them as they come in. We would like to use array jobs to reduce strain on the slurm scheduler as much as possible as this has been a problem on our cluster.

Below is a demonstration of the issue:

#!/usr/bin/env nextflow
params.array_size = 10
params.sleep_time = 30
params.outdir = "results"
params.input_dir = "input_files"
params.partition = 'testpartition'

process DUMMY_TASK {
    executor 'slurm'
    queue 'testpartition'
    array params.array_size
    cpus 1
    memory '1 GB'
    time '5m'
    debug true
    tag "task_${task.index}"
    
    input:
    path input_file
    
    output:
    path "${input_file.baseName}_output.txt"
    
    script:
    """
    echo "SLURM_ARRAY_JOB_ID: \${SLURM_ARRAY_JOB_ID:-'not_set'}" >> ${input_file.baseName}_output.txt
    # Simulate processing
    sleep ${params.sleep_time}
    """
}

workflow {
    input_files = Channel.watchPath("${params.input_dir}/*.txt")

    DUMMY_TASK(input_files)

    DUMMY_TASK.out.view { file ->
        "[${new Date().format('HH:mm:ss')}] Processed: ${file.name}"
    }
} 

Then run

for i in {1..16};do echo "Input data for task $i" > input_files/task_${i}.txt && echo "Created input_files/task_${i}.txt"; sleep .5;done

In the same directory.

Currently nextflow will hang at

[be/e4bc7e] process > DUMMY_TASK (task_15) [100%] 10 of 10

There should be 16 jobs as we created 16 files. It is waiting for 4 more tasks to reach the array size of 10 before it submits the array job. If these 4 never come, then these 6 tasks will never get submitted as jobs.

My proposal is to create a maximum amount of time to wait for more jobs before submitting the array of jobs. Then the pipeline would work like this:

The first 10 jobs are submitted as an array job as expected. The next 6 jobs are added into the task array. Then, the time specified by executorSubmitTimeout elapses. At this point the remaining 6 jobs are put into a job array and submitted to the cluster.

My WIP fork currently has this implementation: https://github.com/johnoooh/nextflow/blob/feature/slurm_array_timer/modules/nextflow/src/main/groovy/nextflow/processor/TaskArrayCollector.groovy It still needs a few changes. The parameter needs to be added to the config.

I know this is a pretty specific issue, but I don't think it's an unreasonable addition. Let me know if you foresee any issues with this approach?

This discussion was started here #5924

johnoooh avatar Oct 09 '25 20:10 johnoooh

@johnoooh if you could submit a PR (even just a draft) with your changes just to get the process started, that would help me to move things along

bentsherman avatar Nov 19 '25 16:11 bentsherman

Hey Ben, I'm still working on this. I'll work on a draft PR but currently I'm encountering a bug when I'm running a lot of samples using the branch. I think I need to add some checks so the groovy arrays are not modified multiple times.

If you're interested here are a few lines from the .nextflow.log.

Dec-09 09:46:48.642 [Actor Thread 276] DEBUG nextflow.processor.TaskProcessor - Handling unexpected condition for
  task: name=snv_wf:RunMutect2 (sampleid@idt-0019-scattered); work-dir=./workdir/
  error [java.util.ConcurrentModificationException]: java.util.ConcurrentModificationException
Dec-09 09:46:48.669 [Actor Thread 276] ERROR nextflow.processor.TaskProcessor - Error executing process > 'snv_wf:RunMutect2 (sampleid@idt-0019-scattered)'

Caused by:
  java.util.ConcurrentModificationException


java.util.ConcurrentModificationException: null
	at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1096)
	at java.base/java.util.ArrayList$Itr.next(ArrayList.java:1050)
	at nextflow.executor.BashWrapperBuilder.createContainerBuilder(BashWrapperBuilder.groovy:737)
	at nextflow.executor.BashWrapperBuilder.makeBinding(BashWrapperBuilder.groovy:305)
	at nextflow.executor.BashWrapperBuilder.buildNew0(BashWrapperBuilder.groovy:429)
	at nextflow.executor.BashWrapperBuilder.buildNew0(BashWrapperBuilder.groovy:191)
	at nextflow.executor.BashWrapperBuilder.build(BashWrapperBuilder.groovy:444)
	at nextflow.executor.GridTaskHandler.prepareLauncher(GridTaskHandler.groovy:106)
	at nextflow.processor.TaskArrayCollector.createTaskArray(TaskArrayCollector.groovy:203)
	at nextflow.processor.TaskArrayCollector.submitAndReset(TaskArrayCollector.groovy:166)
	at nextflow.processor.TaskArrayCollector.flushArrayDueToSize(TaskArrayCollector.groovy:146)
	at nextflow.processor.TaskArrayCollector.collect(TaskArrayCollector.groovy:132)
	at org.codehaus.groovy.vmplugin.v8.IndyInterface.fromCache(IndyInterface.java:321)
	at nextflow.processor.TaskProcessor.submitTask(TaskProcessor.groovy:2357)
	at nextflow.processor.TaskProcessor.checkCachedOrLaunchTask(TaskProcessor.groovy:845)
	at nextflow.processor.TaskProcessor.invokeTask(TaskProcessor.groovy:657)
	at nextflow.processor.InvokeTaskAdapter.call(InvokeTaskAdapter.groovy:52)
	at groovyx.gpars.dataflow.operator.DataflowOperatorActor.startTask(DataflowOperatorActor.java:120)
	at groovyx.gpars.dataflow.operator.ForkingDataflowOperatorActor.access$001(ForkingDataflowOperatorActor.java:35)
	at groovyx.gpars.dataflow.operator.ForkingDataflowOperatorActor$1.run(ForkingDataflowOperatorActor.java:58)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1575)
Dec-09 09:46:48.682 [Actor Thread 276] DEBUG nextflow.Session - Session aborted -- Cause: java.util.ConcurrentModificationException
Dec-09 09:46:48.713 [Task submitter] DEBUG nextflow.executor.GridTaskHandler - [SLURM] submitted process snv_wf:RunMutect2 (42511) > jobId: 17923539; workDir: ./work/

johnoooh avatar Dec 10 '25 18:12 johnoooh

I would just make a PR with what you have and I can work from that

bentsherman avatar Dec 10 '25 21:12 bentsherman

I made the draft PR https://github.com/nextflow-io/nextflow/pull/6647

johnoooh avatar Dec 10 '25 21:12 johnoooh