nextflow
nextflow copied to clipboard
storeDir is not working
Bug report
Using storeDir makes the pipeline crash complaining that the required output file is not there any more.
Expected behavior and actual behavior
In past storeDir was copying the files to another folder, now it moves them and this breaks the workflow execution.
- Nextflow version: 19.10.0
Error executing process > 'buildIndex (Cowc_gDNA_mtDNA.fasta)'
Caused by:
Missing output file(s) `STARgenome` expected by process `buildIndex (Cowc_gDNA_mtDNA.fasta)`
Command executed:
mkdir STARgenome
if [ `echo Cowc_gDNA_mtDNA.fasta | grep ".gz"` ]; then
zcat Cowc_gDNA_mtDNA.fasta > `basename Cowc_gDNA_mtDNA.fasta .gz`
STAR --runMode genomeGenerate --genomeDir STARgenome --runThreadN 8 --genomeFastaFiles `basename Cowc_gDNA_mtDNA.fasta .gz` --sjdbGTFfile Cowc_long.annot.exon.gtf --sjdbOverhang 49 --outFileNamePrefix STARgenome ;
rm `basename Cowc_gDNA_mtDNA.fasta .gz`
else
STAR --runMode genomeGenerate --genomeDir STARgenome --runThreadN 8 --genomeFastaFiles Cowc_gDNA_mtDNA.fasta --sjdbGTFfile Cowc_long.annot.exon.gtf --sjdbOverhang 49 --outFileNamePrefix STARgenome
fi
Command exit status:
0
Command output:
Nov 05 10:29:40 ..... started STAR run
Nov 05 10:29:40 ... starting to generate Genome files
Nov 05 10:29:40 ... starting to sort Suffix Array. This may take a long time...
Nov 05 10:29:41 ... sorting Suffix Array chunks and saving them to disk...
Nov 05 10:29:45 ... loading chunks from disk, packing SA...
Nov 05 10:29:46 ... finished generating suffix array
Nov 05 10:29:46 ... generating Suffix Array index
Nov 05 10:29:52 ... completed Suffix Array index
Nov 05 10:29:52 ..... processing annotations GTF
Nov 05 10:29:52 ..... inserting junctions into the genome indices
Nov 05 10:30:03 ... writing Genome to disk ...
Nov 05 10:30:03 ... writing Suffix Array to disk ...
Nov 05 10:30:03 ... writing SAindex to disk
Nov 05 10:30:09 ..... finished successfully
it was a problem with our storage... I''m wondering if this kind of asynchronous copy can give problem to the pipelines
I'll add a couple of suggestions: it looks like this problem is related to the fact that big files take time to copy to the final directory indicated in storageDir. So you might want to add either a sleep() after the execution of the command or into a "afterScript" definition. Another solution can be to add a retry with an incremental sleep in the nextflow.config file
From time to time I still see this problem with storeDir... I don't know how many people see it. Maybe @JoseEspinosa also experience it
it would be nice to have a function that allows some waiting time before the file is copied to the place indicated by storeDir
Are you trying to use the storeDir
output in a downstream task within the same pipeline? I think storeDir
is designed only to work across pipeline runs
@pditommaso it looks like the task outputs are actually moved into the storeDir, which would prevent downstream tasks from being able to also use the output. Maybe instead we should keep a link in the task directory to the storeDir so that it can still be used as a regular output
Hi @bentsherman, I'm trying to run the pipeline just once and it fails. Likely because the process is looking at the output file in a position while it is being copied.
I think the presence of a soft link can be a patch (but not so sure it will work in AWS. batch).
Recently I was reading about software-defined assets in Dagster, which looks like this:
@asset
def logins(website_events: DataFrame) -> DataFrame:
return website_events[website_events["type"] == "login"]
This flips the script, instead of saying "this is a task, here are it's outputs", it says "this is a data asset, here is how to compute it". This is a very intuitive way to think about the pipeline as it focuses on the thing we care about most, the results.
I wonder if we could do something like this in Nextflow. Ideally keeping the same syntax for processes, but making the workflow definition more output-centric.
I think we need to combine storeDir
and publishDir
into a single concept and make it more prominent in the workflow definition. I should be able to say "this asset should exist in (storeDir/publishDir), here's how to re-compute it if needed (process definition)":
Using rnaseq-nf as an example:
process INDEX {
tag "$transcriptome.simpleName"
conda 'salmon=1.10.2'
input:
path transcriptome
output:
path index
script:
"""
salmon index --threads $task.cpus -t $transcriptome -i index
"""
}
workflow {
index = asset('/data/index') {
INDEX(params.transcriptome)
}
}
The main idea is to make the store/publish dir external to the process invocation. The workflow should request an asset from some local path, show how to re-compute it, then Nextflow can figure out if the request asset is up to date and decide whether to re-compute it.
Sorry for the rant @lucacozzuto , I know you're just trying to fix the bug right in front of you, I just got inspired to think about the long-term solution. If you could provide a minimal test case, we should be able to get to the bottom of it
Don't worry I love this way of thinking more in broad terms... I just think that the output centric way can be a bit reductive... A nice thing about Nextflow is to forget about the naming of input / outputs... So I would think twice about changing this assets of the language. About storedir I'll work on a test case
I agree with your point about naming things. Dagster seems to work well with the output-centric model because it's all just variable names and in-memory dataframes, so you don't have to worry about organizing files in a filesystem. Nextflow also saves you from having to name things all the time by having the hidden work directory and then you specify which outputs to publish and where. An output-centric model in Nextflow should continue in that pattern.
Maybe we can incorporate storeDir into the upcoming workflow publish definition #4784 :
workflow {
main:
index = INDEX(params.transcriptome)
publish:
index >> '/data/index'
}
One thing we have added with this feature is an option to not re-publish a file if the publish destination is up to date, similar to the cache
directive. I guess the natural next step would be, if all outputs published by a process are already up to date in the publish destination, don't re-execute the task.
Agree. However, I cannot always reproduce the error. So it is likely that if the (big) file is not copied completely, it fails. Storedir is useful when you have a big task (like indexing) that you don't want to re-execute each time and use the same file among different pipeline runs... but we really need some "waiting time" between the copy in the "store" folder and the check of the existence of the output file.
Looking at the code more closely, the way it works is:
- the job
.command.run
moves the output files to the storeDir usingmv
- after the job is complete (including the
mv
command), Nextflow checks the storeDir for the output files as declared in theoutput:
section
Because the mv
command is part of the job, I would expect the storeDir to be up to date once the job is finished
the problem happens on a HPC... so I think is some problem with some asynchronous process
The procedure I described is not asynchronous. Nextflow does not check the storeDir until after the job and file transfer has completed.
I think a way to test your theory would be to copy a large file to the storeDir and then try to access the file from the storeDir
We noticed exactly the same problem in one of the epi2me ONT workflows today and I agree with @lucacozzuto that it is a problem with asynchrony in a HPC environment. In our scenario the Nextflow head job and the job using storeDir were scheduled to two different nodes in our cluster and workdir (NXF_WORK) and also the storeDir are on a shared filesystem with some latency. The job with the storeDir process finished successfully (RC=0), i.e. nxf_unstage() in .command.run and subsequently nx_fs_move() moves all the files to storeDir. Then Nextflow (on a different node) checks the output and due to slow NFS does not find the file(s) and finally resulting in a Missing output file(s) error.
I remember that the grid executors have this hack to deal with NFS delays when checking the exitcode file for a job: https://github.com/nextflow-io/nextflow/blob/12b027ee7e70d65bdee912856478894af4602170/modules/nextflow/src/main/groovy/nextflow/executor/GridTaskHandler.groovy#L321-L330
Apparently if you list a directory it will force-update the NFS metadata. Might be able to do the same thing for storeDir, if someone would like to give it a try: https://github.com/nextflow-io/nextflow/blob/12b027ee7e70d65bdee912856478894af4602170/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy#L850-L858
I think it would be enough to list the storeDir before calling collectOutputs()