feat: Azure Batch eagerly terminates jobs after all tasks have been submitted
Azure Batch "job leak" is still an issue. This commit fixes #5839 which allows Nextflow to set jobs to auto terminate when all tasks have been submitted. This means that eventually jobs will move into terminated state even if something prevents nextflow reaching a graceful shutdown. Very early implementation and needs some refinement.
Code wise, this replaces terminateJobs, with setJobTermination, which sets the auto termination status of the jobs. This is called in a few places:
- When the
terminateJobsis called at graceful shutdown by Nextflow (old behaviour) - When
setAutoJobTerminationis called on a specific job
Then a TraceObserver is added for Azure Batch which will call setJobAutoTerminate when onProcessTerminate is called.
Very out of my depth in this part of the code base, so expect things to be wrong.
Deploy Preview for nextflow-docs-staging ready!
| Name | Link |
|---|---|
| Latest commit | bc6b754642ce838edefa0ef697e9b9bbbfc4f370 |
| Latest deploy log | https://app.netlify.com/projects/nextflow-docs-staging/deploys/691b0819199f4a00080de58f |
| Deploy Preview | https://deploy-preview-6159--nextflow-docs-staging.netlify.app |
| Preview on mobile | Toggle QR Code...Use your smartphone camera to open QR code link. |
To edit notification comments on pull requests, go to your Netlify project configuration.
Example Nextflow pipeline:
process GREETING {
executor 'azurebatch'
container 'ubuntu:22.04'
input:
val greeting
output:
path "output.txt"
script:
"""
echo $greeting > output.txt
"""
}
process SLEEP {
executor 'azurebatch'
container 'ubuntu:22.04'
input:
path inputfile
output:
stdout
script:
"""
sleep 360
cat ${inputfile}
"""
}
workflow {
Channel.of("hello", "bonjour", "gutentag")
| GREETING
| SLEEP
}
Integration tests failing, looks unrelated.
Retry now
Retry now
Done!
Related issue on Slack: https://nfcore.slack.com/archives/C02T98A23U7/p1753954588096009
Hi ! Unsure if I should post here or in #nextflow-plugins, as it is concerns help about a plugin. Concerned plugin is nf-azure , where, in the current state jobs are only deleted after completion of the workflow with AzBatchService.cleanupJobs() The problem is that a job with all completed tasks still count towards the job quota ; thus allowing only 3-4 pipelines to run instead of 50+ (with 2-3 tasks each) This is why I want to periodically run a mid-workflow cleanup in the AzBatchService such as :
protected void cleanupCompletedJobsMidRun() { for (String jobId : allJobIds.values()) { try { def tasks = client.listTasks(jobId) if (tasks.every { it.state.toString() in ['COMPLETED'] }) { log.trace "Deleting Azure job ${jobId} mid-run" apply(() -> client.deleteJob(jobId)) } } catch (Exception e) { log.debug "Skipping mid-run cleanup for ${jobId} - ${e.message ?: e}" } } }My problem is that it implies a modification of the nf-azure plugin, which being a core plugin, must be handled differently that a custom plugin derived from nf-hello Apart from the natural plugin packaging, my main help wanted is on the overriding of the proper nf-azure plugin in PluginsFacade , so I have my own executor in :
protected List<PluginSpec> defaultPluginsConf(Map config) { // retrieve the list from the env var final commaSepList = env.get('NXF_PLUGINS_DEFAULT') if( commaSepList && commaSepList !in ['true','false'] ) { // if the plugin id in the list does *not* contain the @version suffix, it picks the version // specified in the defaults list. Otherwise parse the provider id@version string to the corresponding spec return commaSepList .tokenize(',') .collect( it-> defaultPlugins.hasPlugin(it) ? defaultPlugins.getPlugin(it) : PluginSpec.parse(it) ) } final plugins = new ArrayList<PluginSpec>() final workDir = config.workDir as String final bucketDir = config.bucketDir as String final executor = Bolts.navigate(config, 'process.executor') if( executor == 'awsbatch' || workDir?.startsWith('s3://') || bucketDir?.startsWith('s3://') || env.containsKey('NXF_ENABLE_AWS_SES') ) plugins << defaultPlugins.getPlugin('nf-amazon') if( executor == 'google-lifesciences' || executor == 'google-batch' || workDir?.startsWith('gs://') || bucketDir?.startsWith('gs://') ) plugins << defaultPlugins.getPlugin('nf-google') if( executor == 'azurebatch' || workDir?.startsWith('az://') || bucketDir?.startsWith('az://') ) plugins << defaultPlugins.getPlugin('nf-azure') ... return plugins }Is this problem easily solvable, or should we focus on balancing the load over multiple batch accounts instead of relying on only one ?
Hi ! Posting my comment here as indicated in Slack. Our 'use case' is relatively simple, we just run big pipelines (aka with many jobs), and with completed jobs being deleted only when all tasks are completed (or cancelled), we reach our quota relatively quickly, allowing us to have only a few pipelines running with 2-3 active jobs instead of dozens.
Should we not care about resuming a workflow, since task outputs are stored outside of the workdir, a simple flag to delete the job once the last task has finished it's lifecycle would be all we need.
The downside is that to "resume" a workflow, me must find a way to tell Nextflow that task outputs already exist and are stored at X location, as well as supply them. Which leads to the fact that the easiest solution would be to relaunch the whole workflow should we have an error arise somewhere during processing.
In which case, sometimes losing resume is an acceptable tradeoff for having a pipeline complete at all.
Let's do this:
- [ ] Feature flag this so we can turn it off and on
- [ ] Add a try/catch for submitting tasks so it will create a new job if the existing job is terminated
since task outputs are stored outside of the workdir
@ghislaindemael I'm not sure I understand this; Task outputs have to be in the working directory. Even if they're published to a new location, they are copied out after the task is completed by Nextflow itself.
@adamrtalbot
My error, indeed I meant as we publish the results outside of the workdir (e.g. in Blob Storage), we can query them from here and thus delete them from the Batch VMs to remove the load and free up jobs for the quota.
we can query them from here and thus delete them from the Batch VMs to remove the load and free up jobs for the quota.
To clarify the flow here:
- A job is created on Azure Batch by Nextflow and assigned to a Node Pool
- One or more tasks are added to the Job
- Each task is assigned to a node
- The task starts
- It downloads the input files from Azure Blob storage to the local node
- It creates the output files on the local node
- It uploads the output files back to Azure Blob storage at the working directory
- The task completes
- The output files on the local node are deleted to clear space for future tasks
- (Optional): The output files are copied from the working directory to the publishing directory by Nextflow
- When the pipeline completes, Nextflow will terminate the job created in 1, preventing any new tasks being added to the job and clearing quota
This PR strictly refers to 1 and 9 and does not interact with any files. If you are having issues with file storage, running out of space, etc., this would be an different issue.
@adamrtalbot we are also looking for a solution for this. We have executions that have hundreds of jobs sometimes, so, even in proper executions without errors, we are limited to 2 or 3 parallel executions per batch account. In our case, at any given moment, we would have something like:
Run1:
200 jobs marked as finished, e.g. [100%] X of X ✔
2 jobs ongoing with pending tasks
20 jobs not started
Run2:
57 jobs marked as finished, e.g. [100%] X of X ✔
4 jobs ongoing with pending tasks
200 jobs not started
So, from Azure's perspective, we have 200+2+57+4=263 jobs ongoing. As the runs progress, we have more and more jobs open and we reach the limit very quickly.
We are seeing if we can modify / extend the nf-azure plugin to handle this by adding some sort of cron job that deletes the jobs marked as finished (with a ✔), but you seem to say that this might cause issues when resuming the tasks? Why is that? It seems to me that the job names are different when it resumes the execution, no?
but you seem to say that this might cause issues when resuming the tasks?
Not resume, but retry with an errorStrategy: https://www.nextflow.io/docs/latest/reference/process.html#errorstrategy
Here is the flow that may cause issues:
- A job is created for a process
- 5 tasks are submitted to the job
- Nextflow decides no more tasks will be submitted and closes the Job (sets to terminateOnCompletion)
- 4 tasks successfully complete, 1 task fails
- All tasks have completed, Azure Batch terminates the job
- The failed task tries to retry
- It gets submitted to a terminated job!
So using a cron job that validates when all tasks have completed successfully would work, right? In this case, the tasks have already completed. Or can we add it to the bit that runs and "decides" that all the tasks have been completed?
So using a cron job that validates when all tasks have completed successfully would work, right? In this case, the tasks have already completed. Or can we add it to the bit that runs and "decides" that all the tasks have been completed?
You might have the same issue, in that you terminate a job before you can resubmit a task.
Note none of this will help you if you just have too many active jobs. A job needs to be active to run a task, so if you just have a lot of work to do this wont help.
Really, the issue is with the terrible design by Azure but given they just fired most of their genomics staff I doubt they will bother to help 🤷 .
I have too many active jobs because nextflow does not close them, not because they are actually active.
Any job that has been marked by nextflow with a ✔ is, in my opinion, finished and will not be re-used for anything never again, however, nextflow does not close it until the full execution of the run is finished. This is the behaviour that I think is incorrect. If the run takes 2 days to run, the first task that finished 47 hours ago is still marked as "active" in batch because Nextflow does not close it even though it will never be used again. I think it is Nextflow that is not using the Batch account properly.
Any job that has been marked by nextflow with a ✔ is, in my opinion, finished and will not be re-used for anything never again, however, nextflow does not close it until the full execution of the run is finished. This is the behaviour that I think is incorrect. If the run takes 2 days to run, the first task that finished 47 hours ago is still marked as "active" in batch because Nextflow does not close it even though it will never be used again. I think it is Nextflow that is not using the Batch account properly.
Right - so with especially long running pipelines you have many jobs in active state which do not do anything.
Unfortunately, there isn't a way of Nextflow knowing the future and determining if another task will be submitted to the job which makes it tricky to know when to close a job.
Here's an alternative implementation (which has the added benefit of making @pditommaso happy because it wont use the trace observer!):
- Create Job
- Submit task
- IMMEDIATELY set Job to terminate onAllBatchTasksComplete
- Add a try/catch so if you try to submit a Task to a Job in completed state, a new Job will be created, allowing more jobs to start.
This should eagerly terminate jobs while still allowing users to submit all tasks as normal.
Unfortunately, there isn't a way of Nextflow knowing the future and determining if another task will be submitted to the job which makes it tricky to know when to close a job.
Then how does it "decide" to add the ✔?
The check is shown when no more tasks for that process need to be executed ie. the process execution is complete
The check is shown when no more tasks for that process need to be executed ie. the process execution is complete
Excellent, can we use that logic to terminate the Azure Job?
It could be done with a TraceObserver(V2). If i'm not wrong you already made a pr for that
It could be done with a TraceObserver(V2). If i'm not wrong you already made a pr for that
I have, it's this one but I've modified the behaviour now.
@luanjot I've updated the behaviour so now:
- All Jobs are immediately set to terminate after all Tasks complete
- Any subsequent Tasks are still submitted to the same Job
- If a Job is found to be in terminated state when a Task is added, Nextflow will just create a new Job (also set to automatically terminate).
The advantage here is:
- The Job will eagerly terminate as soon as all tasks have finished
- This behaviour is controlled by Azure Batch, so we can defer any of that logic to their side. If Nextflow dies, the job still terminates.
- From a user perspective, very little changes other than
azure.batch.terminateJobsOnCompletionshould work earlier.
@luanjot and @ghislaindemael, I would appreciate it if you could give this development branch some extra testing, provided you can do a local build of Nextflow. I've ran it through a bunch of pipelines but more UAT is always helpful!
It looks neat but not getting the general logic. Can you provide some hint ?
Added some comments in https://github.com/nextflow-io/nextflow/pull/6159/commits/df102dadd279bf39fec387f4678dae35166cf885 referring to https://github.com/nextflow-io/nextflow/pull/6159#issuecomment-3179253674
@adamrtalbot i've updated the PR description for you 😎
One thing that i', not getting, if the job is created with the flag TERMINATE_JOB on creation
OnAllBatchTasksComplete.TERMINATE_JOB
why it still needed to update the job with the same flag on completion?
https://github.com/nextflow-io/nextflow/blob/5839_azure_batch_jobs_terminate_upon_completion/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy#L1068-L1071
One thing that i', not getting, if the job is created with the flag TERMINATE_JOB on creation
OnAllBatchTasksComplete.TERMINATE_JOB
why it still needed to update the job with the same flag on completion?
https://github.com/nextflow-io/nextflow/blob/5839_azure_batch_jobs_terminate_upon_completion/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy#L1068-L1071
Good point - it's the previous old code hanging around.
On one hand, it's a good idea to go around afterwards and definitely mark them as complete, because it has zero risk and can cause many issues if it isn't done.
On the other hand, it's pointlessly doing more API calls.
This is only situation I can think of where a job wouldn't be set to autoterminate:
- Nextflow creates a Job
- Nextflow tries to add a Task to a Job, but it fails (e.g. API error)
- Nextflow terminates
- Nextflow leaves job in active state
- Quota is consumed 😨
In which case, leaving it in will help.
From my point of view it would be better to keep only the cleanup on jon creation (why it should fail?).
The update on terminate does not scale well with a large number of tasks