nextflow
nextflow copied to clipboard
Process cache groups
New feature
Sometimes an error in the process is only exposes later on in the pipeline, when a downstream process fails. In these cases, it would be useful to force resume to restart from an earlier step in the pipeline rather than the failed process.
Usage scenario
Let's imagine a scenario where have three processes. The first is non-deterministic, because it uses a fancy new AI algorithm. The second and third use the output of the first process in sequence, however sometimes the third process will fail because the algorithm doesn't reach equilibrium or something. We might solve this by using the resume feature of Nextflow and trying to catch the error, but this will skip process 1 and 2 and jump straight to 3. This might just repeat the error, so we would prefer to start from process 1 again. Here's a minimal example:
params.exitcode = 1
process RANDOM {
output:
path("output.txt")
script:
"""
echo \$RANDOM > output.txt
"""
}
process DO_THING_WITH_RANDOM {
input:
path "input.txt"
output:
path("output.txt")
script:
"""
cat input.txt > output.txt
"""
}
process FAIL_WITH_RANDOM {
input:
path "input.txt"
val exitcode
output:
path "output.txt"
script:
"""
cat input.txt > output.txt
exit $exitcode
"""
}
workflow {
RANDOM()
DO_THING_WITH_RANDOM(RANDOM.out)
FAIL_WITH_RANDOM(DO_THING_WITH_RANDOM.out, params.exitcode)
}
In this case, there is nothing we can do to make RANDOM
restart when using -resume
, even though it the output will change every time we run it.
Suggest implementation
If we could 'group' caches up, so if any are invalidated within a set we could restart from all of them. For example, we could add a key value which can be used to associate processes by sample ID:
process MYPROCESS {
cache true, key: id
input:
tuple val(id), path(bam), path(bai)
...
}
Alternatively, we should provide the tools for developers to add this to the errorStrategy
so this could be baked into the pipeline itself. This might follow a similar pattern:
process MYPROCESS {
errorStrategy "retry"
errorGroup id
input:
tuple val(id), path(bam), path(bai)
...
}