nextflow icon indicating copy to clipboard operation
nextflow copied to clipboard

feat: Azure Batch eagerly terminates jobs after all tasks have been submitted

Open adamrtalbot opened this issue 7 months ago • 5 comments

Azure Batch "job leak" is still an issue. This commit fixes #5839 which allows Nextflow to set jobs to auto terminate when all tasks have been submitted. This means that eventually jobs will move into terminated state even if something prevents nextflow reaching a graceful shutdown. Very early implementation and needs some refinement.

Code wise, this replaces terminateJobs, with setJobTermination, which sets the auto termination status of the jobs. This is called in a few places:

  • When the terminateJobs is called at graceful shutdown by Nextflow (old behaviour)
  • When setAutoJobTermination is called on a specific job

Then a TraceObserver is added for Azure Batch which will call setJobAutoTerminate when onProcessTerminate is called.

Very out of my depth in this part of the code base, so expect things to be wrong.

adamrtalbot avatar Jun 04 '25 10:06 adamrtalbot

Deploy Preview for nextflow-docs-staging ready!

Name Link
Latest commit bc6b754642ce838edefa0ef697e9b9bbbfc4f370
Latest deploy log https://app.netlify.com/projects/nextflow-docs-staging/deploys/691b0819199f4a00080de58f
Deploy Preview https://deploy-preview-6159--nextflow-docs-staging.netlify.app
Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

netlify[bot] avatar Jun 04 '25 10:06 netlify[bot]

Example Nextflow pipeline:

process GREETING {
    executor 'azurebatch'
    container 'ubuntu:22.04'

    input:
        val greeting

    output:
        path "output.txt"

    script:
    """
    echo $greeting > output.txt
    """
}

process SLEEP {
    executor 'azurebatch'
    container 'ubuntu:22.04'

    input:
        path inputfile

    output:
        stdout

    script:
    """
    sleep 360
    cat ${inputfile}
    """
}

workflow  {
    Channel.of("hello", "bonjour", "gutentag")
    | GREETING
    | SLEEP
}

adamrtalbot avatar Jun 04 '25 10:06 adamrtalbot

Integration tests failing, looks unrelated.

adamrtalbot avatar Jun 04 '25 10:06 adamrtalbot

Retry now

pditommaso avatar Jun 04 '25 12:06 pditommaso

Retry now

Done!

adamrtalbot avatar Jun 04 '25 15:06 adamrtalbot

Related issue on Slack: https://nfcore.slack.com/archives/C02T98A23U7/p1753954588096009

Hi ! Unsure if I should post here or in #nextflow-plugins, as it is concerns help about a plugin. Concerned plugin is nf-azure , where, in the current state jobs are only deleted after completion of the workflow with AzBatchService.cleanupJobs() The problem is that a job with all completed tasks still count towards the job quota ; thus allowing only 3-4 pipelines to run instead of 50+ (with 2-3 tasks each) This is why I want to periodically run a mid-workflow cleanup in the AzBatchService such as :

protected void cleanupCompletedJobsMidRun() {
        for (String jobId : allJobIds.values()) {
            try {
                def tasks = client.listTasks(jobId)
                if (tasks.every { it.state.toString() in ['COMPLETED'] }) {
                    log.trace "Deleting Azure job ${jobId} mid-run"
                    apply(() -> client.deleteJob(jobId))
                }
            }
            catch (Exception e) {
                log.debug "Skipping mid-run cleanup for ${jobId} - ${e.message ?: e}"
            }
        }
    }

My problem is that it implies a modification of the nf-azure plugin, which being a core plugin, must be handled differently that a custom plugin derived from nf-hello Apart from the natural plugin packaging, my main help wanted is on the overriding of the proper nf-azure plugin in PluginsFacade , so I have my own executor in :

protected List<PluginSpec> defaultPluginsConf(Map config) {
        // retrieve the list from the env var
        final commaSepList = env.get('NXF_PLUGINS_DEFAULT')
        if( commaSepList && commaSepList !in ['true','false'] ) {
            // if the plugin id in the list does *not* contain the @version suffix, it picks the version
            // specified in the defaults list. Otherwise parse the provider id@version string to the corresponding spec
            return commaSepList
                    .tokenize(',')
                    .collect( it-> defaultPlugins.hasPlugin(it) ? defaultPlugins.getPlugin(it) : PluginSpec.parse(it) )
        }

        final plugins = new ArrayList<PluginSpec>()
        final workDir = config.workDir as String
        final bucketDir = config.bucketDir as String
        final executor = Bolts.navigate(config, 'process.executor')

        if( executor == 'awsbatch' || workDir?.startsWith('s3://') || bucketDir?.startsWith('s3://') || env.containsKey('NXF_ENABLE_AWS_SES') )
            plugins << defaultPlugins.getPlugin('nf-amazon')

        if( executor == 'google-lifesciences' || executor == 'google-batch' || workDir?.startsWith('gs://') || bucketDir?.startsWith('gs://')  )
            plugins << defaultPlugins.getPlugin('nf-google')

        if( executor == 'azurebatch' || workDir?.startsWith('az://') || bucketDir?.startsWith('az://') )
            plugins << defaultPlugins.getPlugin('nf-azure')

...
            
        return plugins
    }

Is this problem easily solvable, or should we focus on balancing the load over multiple batch accounts instead of relying on only one ?

adamrtalbot avatar Jul 31 '25 12:07 adamrtalbot

Hi ! Posting my comment here as indicated in Slack. Our 'use case' is relatively simple, we just run big pipelines (aka with many jobs), and with completed jobs being deleted only when all tasks are completed (or cancelled), we reach our quota relatively quickly, allowing us to have only a few pipelines running with 2-3 active jobs instead of dozens.

Should we not care about resuming a workflow, since task outputs are stored outside of the workdir, a simple flag to delete the job once the last task has finished it's lifecycle would be all we need.

The downside is that to "resume" a workflow, me must find a way to tell Nextflow that task outputs already exist and are stored at X location, as well as supply them. Which leads to the fact that the easiest solution would be to relaunch the whole workflow should we have an error arise somewhere during processing.

ghislaindemael avatar Aug 06 '25 07:08 ghislaindemael

In which case, sometimes losing resume is an acceptable tradeoff for having a pipeline complete at all.

Let's do this:

  • [ ] Feature flag this so we can turn it off and on
  • [ ] Add a try/catch for submitting tasks so it will create a new job if the existing job is terminated

adamrtalbot avatar Aug 06 '25 09:08 adamrtalbot

since task outputs are stored outside of the workdir

@ghislaindemael I'm not sure I understand this; Task outputs have to be in the working directory. Even if they're published to a new location, they are copied out after the task is completed by Nextflow itself.

adamrtalbot avatar Aug 06 '25 09:08 adamrtalbot

@adamrtalbot

My error, indeed I meant as we publish the results outside of the workdir (e.g. in Blob Storage), we can query them from here and thus delete them from the Batch VMs to remove the load and free up jobs for the quota.

ghislaindemael avatar Aug 08 '25 09:08 ghislaindemael

we can query them from here and thus delete them from the Batch VMs to remove the load and free up jobs for the quota.

To clarify the flow here:

  1. A job is created on Azure Batch by Nextflow and assigned to a Node Pool
  2. One or more tasks are added to the Job
  3. Each task is assigned to a node
  4. The task starts
    • It downloads the input files from Azure Blob storage to the local node
    • It creates the output files on the local node
    • It uploads the output files back to Azure Blob storage at the working directory
  5. The task completes
    • The output files on the local node are deleted to clear space for future tasks
    • (Optional): The output files are copied from the working directory to the publishing directory by Nextflow
  6. When the pipeline completes, Nextflow will terminate the job created in 1, preventing any new tasks being added to the job and clearing quota

This PR strictly refers to 1 and 9 and does not interact with any files. If you are having issues with file storage, running out of space, etc., this would be an different issue.

adamrtalbot avatar Aug 11 '25 11:08 adamrtalbot

@adamrtalbot we are also looking for a solution for this. We have executions that have hundreds of jobs sometimes, so, even in proper executions without errors, we are limited to 2 or 3 parallel executions per batch account. In our case, at any given moment, we would have something like:

Run1: 200 jobs marked as finished, e.g. [100%] X of X ✔ 2 jobs ongoing with pending tasks 20 jobs not started

Run2: 57 jobs marked as finished, e.g. [100%] X of X ✔ 4 jobs ongoing with pending tasks 200 jobs not started

So, from Azure's perspective, we have 200+2+57+4=263 jobs ongoing. As the runs progress, we have more and more jobs open and we reach the limit very quickly.

We are seeing if we can modify / extend the nf-azure plugin to handle this by adding some sort of cron job that deletes the jobs marked as finished (with a ✔), but you seem to say that this might cause issues when resuming the tasks? Why is that? It seems to me that the job names are different when it resumes the execution, no?

luanjot avatar Aug 12 '25 08:08 luanjot

but you seem to say that this might cause issues when resuming the tasks?

Not resume, but retry with an errorStrategy: https://www.nextflow.io/docs/latest/reference/process.html#errorstrategy

Here is the flow that may cause issues:

  1. A job is created for a process
  2. 5 tasks are submitted to the job
  3. Nextflow decides no more tasks will be submitted and closes the Job (sets to terminateOnCompletion)
  4. 4 tasks successfully complete, 1 task fails
  5. All tasks have completed, Azure Batch terminates the job
  6. The failed task tries to retry
  7. It gets submitted to a terminated job!

adamrtalbot avatar Aug 12 '25 09:08 adamrtalbot

So using a cron job that validates when all tasks have completed successfully would work, right? In this case, the tasks have already completed. Or can we add it to the bit that runs and "decides" that all the tasks have been completed?

luanjot avatar Aug 12 '25 09:08 luanjot

So using a cron job that validates when all tasks have completed successfully would work, right? In this case, the tasks have already completed. Or can we add it to the bit that runs and "decides" that all the tasks have been completed?

You might have the same issue, in that you terminate a job before you can resubmit a task.

adamrtalbot avatar Aug 12 '25 09:08 adamrtalbot

Note none of this will help you if you just have too many active jobs. A job needs to be active to run a task, so if you just have a lot of work to do this wont help.

Really, the issue is with the terrible design by Azure but given they just fired most of their genomics staff I doubt they will bother to help 🤷 .

adamrtalbot avatar Aug 12 '25 10:08 adamrtalbot

I have too many active jobs because nextflow does not close them, not because they are actually active.

Any job that has been marked by nextflow with a ✔ is, in my opinion, finished and will not be re-used for anything never again, however, nextflow does not close it until the full execution of the run is finished. This is the behaviour that I think is incorrect. If the run takes 2 days to run, the first task that finished 47 hours ago is still marked as "active" in batch because Nextflow does not close it even though it will never be used again. I think it is Nextflow that is not using the Batch account properly.

luanjot avatar Aug 12 '25 10:08 luanjot

Any job that has been marked by nextflow with a ✔ is, in my opinion, finished and will not be re-used for anything never again, however, nextflow does not close it until the full execution of the run is finished. This is the behaviour that I think is incorrect. If the run takes 2 days to run, the first task that finished 47 hours ago is still marked as "active" in batch because Nextflow does not close it even though it will never be used again. I think it is Nextflow that is not using the Batch account properly.

Right - so with especially long running pipelines you have many jobs in active state which do not do anything.

Unfortunately, there isn't a way of Nextflow knowing the future and determining if another task will be submitted to the job which makes it tricky to know when to close a job.

Here's an alternative implementation (which has the added benefit of making @pditommaso happy because it wont use the trace observer!):

  1. Create Job
  2. Submit task
  3. IMMEDIATELY set Job to terminate onAllBatchTasksComplete
  4. Add a try/catch so if you try to submit a Task to a Job in completed state, a new Job will be created, allowing more jobs to start.

This should eagerly terminate jobs while still allowing users to submit all tasks as normal.

adamrtalbot avatar Aug 12 '25 10:08 adamrtalbot

Unfortunately, there isn't a way of Nextflow knowing the future and determining if another task will be submitted to the job which makes it tricky to know when to close a job.

Then how does it "decide" to add the ✔?

luanjot avatar Aug 12 '25 10:08 luanjot

The check is shown when no more tasks for that process need to be executed ie. the process execution is complete

pditommaso avatar Aug 12 '25 11:08 pditommaso

The check is shown when no more tasks for that process need to be executed ie. the process execution is complete

Excellent, can we use that logic to terminate the Azure Job?

adamrtalbot avatar Aug 12 '25 11:08 adamrtalbot

It could be done with a TraceObserver(V2). If i'm not wrong you already made a pr for that

pditommaso avatar Aug 12 '25 12:08 pditommaso

It could be done with a TraceObserver(V2). If i'm not wrong you already made a pr for that

I have, it's this one but I've modified the behaviour now.

@luanjot I've updated the behaviour so now:

  • All Jobs are immediately set to terminate after all Tasks complete
  • Any subsequent Tasks are still submitted to the same Job
  • If a Job is found to be in terminated state when a Task is added, Nextflow will just create a new Job (also set to automatically terminate).

The advantage here is:

  • The Job will eagerly terminate as soon as all tasks have finished
  • This behaviour is controlled by Azure Batch, so we can defer any of that logic to their side. If Nextflow dies, the job still terminates.
  • From a user perspective, very little changes other than azure.batch.terminateJobsOnCompletion should work earlier.

adamrtalbot avatar Aug 12 '25 13:08 adamrtalbot

@luanjot and @ghislaindemael, I would appreciate it if you could give this development branch some extra testing, provided you can do a local build of Nextflow. I've ran it through a bunch of pipelines but more UAT is always helpful!

adamrtalbot avatar Aug 14 '25 11:08 adamrtalbot

It looks neat but not getting the general logic. Can you provide some hint ?

pditommaso avatar Aug 14 '25 11:08 pditommaso

Added some comments in https://github.com/nextflow-io/nextflow/pull/6159/commits/df102dadd279bf39fec387f4678dae35166cf885 referring to https://github.com/nextflow-io/nextflow/pull/6159#issuecomment-3179253674

adamrtalbot avatar Aug 14 '25 15:08 adamrtalbot

@adamrtalbot i've updated the PR description for you 😎

pditommaso avatar Aug 14 '25 16:08 pditommaso

One thing that i', not getting, if the job is created with the flag TERMINATE_JOB on creation

OnAllBatchTasksComplete.TERMINATE_JOB

why it still needed to update the job with the same flag on completion?

https://github.com/nextflow-io/nextflow/blob/5839_azure_batch_jobs_terminate_upon_completion/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy#L1068-L1071

pditommaso avatar Aug 14 '25 17:08 pditommaso

One thing that i', not getting, if the job is created with the flag TERMINATE_JOB on creation

OnAllBatchTasksComplete.TERMINATE_JOB

why it still needed to update the job with the same flag on completion?

https://github.com/nextflow-io/nextflow/blob/5839_azure_batch_jobs_terminate_upon_completion/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy#L1068-L1071

Good point - it's the previous old code hanging around.

On one hand, it's a good idea to go around afterwards and definitely mark them as complete, because it has zero risk and can cause many issues if it isn't done.

On the other hand, it's pointlessly doing more API calls.

This is only situation I can think of where a job wouldn't be set to autoterminate:

  1. Nextflow creates a Job
  2. Nextflow tries to add a Task to a Job, but it fails (e.g. API error)
  3. Nextflow terminates
  4. Nextflow leaves job in active state
  5. Quota is consumed 😨

In which case, leaving it in will help.

adamrtalbot avatar Aug 14 '25 17:08 adamrtalbot

From my point of view it would be better to keep only the cleanup on jon creation (why it should fail?).

The update on terminate does not scale well with a large number of tasks

pditommaso avatar Aug 14 '25 17:08 pditommaso