nextflow icon indicating copy to clipboard operation
nextflow copied to clipboard

Process cache groups

Open adamrtalbot opened this issue 6 months ago • 1 comments

New feature

Sometimes an error in the process is only exposes later on in the pipeline, when a downstream process fails. In these cases, it would be useful to force resume to restart from an earlier step in the pipeline rather than the failed process.

Usage scenario

Let's imagine a scenario where have three processes. The first is non-deterministic, because it uses a fancy new AI algorithm. The second and third use the output of the first process in sequence, however sometimes the third process will fail because the algorithm doesn't reach equilibrium or something. We might solve this by using the resume feature of Nextflow and trying to catch the error, but this will skip process 1 and 2 and jump straight to 3. This might just repeat the error, so we would prefer to start from process 1 again. Here's a minimal example:

params.exitcode = 1

process RANDOM {
    output:
    path("output.txt")

    script:
    """
    echo \$RANDOM > output.txt
    """
}

process DO_THING_WITH_RANDOM {
    input:
    path "input.txt"

    output:
    path("output.txt")

    script:
    """
    cat input.txt > output.txt
    """
}

process FAIL_WITH_RANDOM {
    input:
    path "input.txt"
    val exitcode

    output:
    path "output.txt"

    script:
    """
    cat input.txt > output.txt
    exit $exitcode
    """
}

workflow {
    RANDOM()
    DO_THING_WITH_RANDOM(RANDOM.out)
    FAIL_WITH_RANDOM(DO_THING_WITH_RANDOM.out, params.exitcode)
}

In this case, there is nothing we can do to make RANDOM restart when using -resume, even though it the output will change every time we run it.

Suggest implementation

If we could 'group' caches up, so if any are invalidated within a set we could restart from all of them. For example, we could add a key value which can be used to associate processes by sample ID:

process MYPROCESS { 
    cache true, key: id
    
    input:
    tuple val(id), path(bam), path(bai)
    ...
}

Alternatively, we should provide the tools for developers to add this to the errorStrategy so this could be baked into the pipeline itself. This might follow a similar pattern:

process MYPROCESS { 
    errorStrategy "retry"
    errorGroup id
    
    input:
    tuple val(id), path(bam), path(bai)
    ...
}

adamrtalbot avatar Aug 19 '24 17:08 adamrtalbot