cloud-pipeline icon indicating copy to clipboard operation
cloud-pipeline copied to clipboard

Nextflow configuration for spot workers

Open mzueva opened this issue 7 months ago • 0 comments

Background Cloud Pipeline can utilize spot instances to optimize expenses. NextFlow is a widely used workflow manager that simplifies running and scaling pipelines. Nf-core is a community-driven repository of over a hundred standardized curated NextFlow pipelines for bioinformatics. Cloud Pipeline configures the Grid Engine cluster, and NextFlow can use it. It would be better not to require changes in the pipeline code.

Approach To utilize spot instances (cloud-based computing in general) requires a retry mechanism. Available options are:

  1. NextFlow process retry
  2. SGE-level built-in automatic task rescheduling.

NextFlow configs NextFlow features additional configuration files, allowing users to override pipeline processes' parameters and customize the environment while keeping the main workflow code intact. NextFlow recreates the process working folder with input data for each process retry. NextFlow pipeline tasks can use rules to increase resource requirements for a retried process attempt. The spot retry mechanism should not increment the attempt counter. When the load is high, and spot instances are unavailable (or terminated shortly), NextFlow may reach its limit for job retry attempts.

SGE The main concern for the SGE level is that a rescheduled task may be unable to rewrite/recreate output files created in the process working folder in the previous process attempt. Clearing all the working folder files is not an option because the folder usually contains links for input data files before the process script starts.

Notes

  1. The NextFlow detection of killed SGE worker/process can be improved by utilizing accounting data. Now, the NextFlow SGE executor detects a killed process after more than 5 minutes.
  2. The NextFlow plugin approach can provide an even more flexible process retry approach and less interference with the pipeline settings.

Usage Use additional config in NextFlow command line options (-c fix-retry-wo-exitcode.config)

fix-retry-wo-exitcode.config

def maxRetriesWoExitcode = 7

def logLevel = 2 // 0 - error, 1 - warn, 2 - info, 3 - debug
def logPrefix = 'Fix retry: '

def backupPrev = { task ->
    if (logLevel >= 3) println("${logPrefix} backupPrev, task: ${task}")
    return [ errorStrategy: task.errorStrategy ]
}

def restorePrev = { task, prev ->
    if (logLevel >= 3) println("${logPrefix} restorePrev, task: ${task}")
    task.errorStrategy = prev.errorStrategy
}

def buildErrorStrategyHandler = { prev ->
    return {
        if (logLevel >= 3) println("${logPrefix} errorStrategyHandler, task: ${task}")
        /*
        If there was no error, so the task will not be run again,
        and it is safe to leave TaskConfig.errorStrategy overridden.
        */
        def res = prev.errorStrategy
        try {
            def exitcodeFileExists = new File((task.workDir / '.exitcode').toString()).exists()
            if (logLevel >= 2) println("${logPrefix} check, '.exitcode' file: ${exitcodeFileExists}, task.exitStatus: ${task.exitStatus}")
            if (!exitcodeFileExists || task.exitStatus == 137) {
                res = 'RETRY'
                task.retryWoExitcode = true
                task.retryWoExitcodeCount = (task.retryWoExitcodeCount ?: 0) + 1
            }
        }
        catch(Exception ex)  {
            if (logLevel >= 0) println("${logPrefix} ERROR: ${ex}")
        } finally {
            restorePrev(task, prev)
        }
        if (logLevel >= 2) println("${logPrefix} errorStrategyHandler, res: ${res}")
        return res
    }
}

process {
    beforeScript = {
        if (logLevel >= 2) println("${logPrefix} beforeScript")

        if (task.retryWoExitcode) {
            if (task.retryWoExitcodeCount < maxRetriesWoExitcode) task.maxRetries = (task.maxRetries ?: 1) + 1
            task.attempt -= task.retryWoExitcodeCount ?: 0

            if (logLevel >= 2) {
                println("${logPrefix} WORKER KILLED" +
                    ", retryWoExitcodeCount: ${retryWoExitcodeCount}" +
                    ", adjust maxRetries: ${task.maxRetries}" +
                    ", adjust task.attempt: ${task.attempt}")
            }

            // temporary adjustment (let's see whether process resources calc respects this)
            // task.errorCount -= task.retryWoExitcodeCount ?: 0
            // task.retryCount -= task.retryWoExitcodeCount ?: 0

            task.retryWoExitcode = null
        }

        prev = backupPrev(task)
        task.errorStrategy = buildErrorStrategyHandler(prev)
        // NextFlow access/calls task.maxRetries only once
        if (logLevel >= 3) println("${logPrefix} beforeScript, task: ${task}")
        return null; // this (last expression) outputs to .command.run
    }
}

mzueva avatar Mar 14 '25 10:03 mzueva