nextflow icon indicating copy to clipboard operation
nextflow copied to clipboard

storeDir is not working

Open lucacozzuto opened this issue 5 years ago • 14 comments

Bug report

Using storeDir makes the pipeline crash complaining that the required output file is not there any more.

Expected behavior and actual behavior

In past storeDir was copying the files to another folder, now it moves them and this breaks the workflow execution.

  • Nextflow version: 19.10.0
Error executing process > 'buildIndex (Cowc_gDNA_mtDNA.fasta)'

Caused by:
  Missing output file(s) `STARgenome` expected by process `buildIndex (Cowc_gDNA_mtDNA.fasta)`

Command executed:

  mkdir STARgenome
  if [ `echo Cowc_gDNA_mtDNA.fasta | grep ".gz"` ]; then 
      zcat Cowc_gDNA_mtDNA.fasta > `basename Cowc_gDNA_mtDNA.fasta .gz`
      STAR --runMode genomeGenerate --genomeDir STARgenome --runThreadN 8             --genomeFastaFiles `basename Cowc_gDNA_mtDNA.fasta .gz` --sjdbGTFfile Cowc_long.annot.exon.gtf             --sjdbOverhang 49 --outFileNamePrefix STARgenome             ;
      rm `basename Cowc_gDNA_mtDNA.fasta .gz`
  else 
      STAR --runMode genomeGenerate --genomeDir STARgenome --runThreadN 8             --genomeFastaFiles Cowc_gDNA_mtDNA.fasta --sjdbGTFfile Cowc_long.annot.exon.gtf             --sjdbOverhang 49 --outFileNamePrefix STARgenome             
  fi

Command exit status:
  0

Command output:
  Nov 05 10:29:40 ..... started STAR run
  Nov 05 10:29:40 ... starting to generate Genome files
  Nov 05 10:29:40 ... starting to sort Suffix Array. This may take a long time...
  Nov 05 10:29:41 ... sorting Suffix Array chunks and saving them to disk...
  Nov 05 10:29:45 ... loading chunks from disk, packing SA...
  Nov 05 10:29:46 ... finished generating suffix array
  Nov 05 10:29:46 ... generating Suffix Array index
  Nov 05 10:29:52 ... completed Suffix Array index
  Nov 05 10:29:52 ..... processing annotations GTF
  Nov 05 10:29:52 ..... inserting junctions into the genome indices
  Nov 05 10:30:03 ... writing Genome to disk ...
  Nov 05 10:30:03 ... writing Suffix Array to disk ...
  Nov 05 10:30:03 ... writing SAindex to disk
  Nov 05 10:30:09 ..... finished successfully

lucacozzuto avatar Nov 05 '19 10:11 lucacozzuto

it was a problem with our storage... I''m wondering if this kind of asynchronous copy can give problem to the pipelines

lucacozzuto avatar Jan 21 '20 10:01 lucacozzuto

I'll add a couple of suggestions: it looks like this problem is related to the fact that big files take time to copy to the final directory indicated in storageDir. So you might want to add either a sleep() after the execution of the command or into a "afterScript" definition. Another solution can be to add a retry with an incremental sleep in the nextflow.config file

lucacozzuto avatar May 26 '22 12:05 lucacozzuto

From time to time I still see this problem with storeDir... I don't know how many people see it. Maybe @JoseEspinosa also experience it

lucacozzuto avatar Apr 26 '24 16:04 lucacozzuto

it would be nice to have a function that allows some waiting time before the file is copied to the place indicated by storeDir

lucacozzuto avatar Apr 26 '24 16:04 lucacozzuto

Are you trying to use the storeDir output in a downstream task within the same pipeline? I think storeDir is designed only to work across pipeline runs

@pditommaso it looks like the task outputs are actually moved into the storeDir, which would prevent downstream tasks from being able to also use the output. Maybe instead we should keep a link in the task directory to the storeDir so that it can still be used as a regular output

bentsherman avatar Apr 29 '24 14:04 bentsherman

Hi @bentsherman, I'm trying to run the pipeline just once and it fails. Likely because the process is looking at the output file in a position while it is being copied.

lucacozzuto avatar Apr 29 '24 14:04 lucacozzuto

I think the presence of a soft link can be a patch (but not so sure it will work in AWS. batch).

lucacozzuto avatar Apr 29 '24 14:04 lucacozzuto

Recently I was reading about software-defined assets in Dagster, which looks like this:

@asset
def logins(website_events: DataFrame) -> DataFrame:
   return website_events[website_events["type"] == "login"]

This flips the script, instead of saying "this is a task, here are it's outputs", it says "this is a data asset, here is how to compute it". This is a very intuitive way to think about the pipeline as it focuses on the thing we care about most, the results.

I wonder if we could do something like this in Nextflow. Ideally keeping the same syntax for processes, but making the workflow definition more output-centric.

I think we need to combine storeDir and publishDir into a single concept and make it more prominent in the workflow definition. I should be able to say "this asset should exist in (storeDir/publishDir), here's how to re-compute it if needed (process definition)":

Using rnaseq-nf as an example:

process INDEX {
  tag "$transcriptome.simpleName"
  conda 'salmon=1.10.2'
  input:
    path transcriptome
  output:
    path index
  script:
    """
    salmon index --threads $task.cpus -t $transcriptome -i index
    """
}

workflow {
  index = asset('/data/index') {
    INDEX(params.transcriptome)
  }
}

The main idea is to make the store/publish dir external to the process invocation. The workflow should request an asset from some local path, show how to re-compute it, then Nextflow can figure out if the request asset is up to date and decide whether to re-compute it.

Sorry for the rant @lucacozzuto , I know you're just trying to fix the bug right in front of you, I just got inspired to think about the long-term solution. If you could provide a minimal test case, we should be able to get to the bottom of it

bentsherman avatar Apr 29 '24 17:04 bentsherman

Don't worry I love this way of thinking more in broad terms... I just think that the output centric way can be a bit reductive... A nice thing about Nextflow is to forget about the naming of input / outputs... So I would think twice about changing this assets of the language. About storedir I'll work on a test case

lucacozzuto avatar Apr 29 '24 17:04 lucacozzuto

I agree with your point about naming things. Dagster seems to work well with the output-centric model because it's all just variable names and in-memory dataframes, so you don't have to worry about organizing files in a filesystem. Nextflow also saves you from having to name things all the time by having the hidden work directory and then you specify which outputs to publish and where. An output-centric model in Nextflow should continue in that pattern.

Maybe we can incorporate storeDir into the upcoming workflow publish definition #4784 :

workflow {
  main:
  index = INDEX(params.transcriptome)

  publish:
  index >> '/data/index'
}

One thing we have added with this feature is an option to not re-publish a file if the publish destination is up to date, similar to the cache directive. I guess the natural next step would be, if all outputs published by a process are already up to date in the publish destination, don't re-execute the task.

bentsherman avatar Apr 29 '24 18:04 bentsherman

Agree. However, I cannot always reproduce the error. So it is likely that if the (big) file is not copied completely, it fails. Storedir is useful when you have a big task (like indexing) that you don't want to re-execute each time and use the same file among different pipeline runs... but we really need some "waiting time" between the copy in the "store" folder and the check of the existence of the output file.

lucacozzuto avatar Apr 29 '24 18:04 lucacozzuto

Looking at the code more closely, the way it works is:

  • the job .command.run moves the output files to the storeDir using mv
  • after the job is complete (including the mv command), Nextflow checks the storeDir for the output files as declared in the output: section

Because the mv command is part of the job, I would expect the storeDir to be up to date once the job is finished

bentsherman avatar Apr 29 '24 18:04 bentsherman

the problem happens on a HPC... so I think is some problem with some asynchronous process

lucacozzuto avatar Apr 29 '24 21:04 lucacozzuto

The procedure I described is not asynchronous. Nextflow does not check the storeDir until after the job and file transfer has completed.

I think a way to test your theory would be to copy a large file to the storeDir and then try to access the file from the storeDir

bentsherman avatar Apr 29 '24 21:04 bentsherman

We noticed exactly the same problem in one of the epi2me ONT workflows today and I agree with @lucacozzuto that it is a problem with asynchrony in a HPC environment. In our scenario the Nextflow head job and the job using storeDir were scheduled to two different nodes in our cluster and workdir (NXF_WORK) and also the storeDir are on a shared filesystem with some latency. The job with the storeDir process finished successfully (RC=0), i.e. nxf_unstage() in .command.run and subsequently nx_fs_move() moves all the files to storeDir. Then Nextflow (on a different node) checks the output and due to slow NFS does not find the file(s) and finally resulting in a Missing output file(s) error.

mbeckste avatar Jul 04 '24 14:07 mbeckste

I remember that the grid executors have this hack to deal with NFS delays when checking the exitcode file for a job: https://github.com/nextflow-io/nextflow/blob/12b027ee7e70d65bdee912856478894af4602170/modules/nextflow/src/main/groovy/nextflow/executor/GridTaskHandler.groovy#L321-L330

Apparently if you list a directory it will force-update the NFS metadata. Might be able to do the same thing for storeDir, if someone would like to give it a try: https://github.com/nextflow-io/nextflow/blob/12b027ee7e70d65bdee912856478894af4602170/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy#L850-L858

I think it would be enough to list the storeDir before calling collectOutputs()

bentsherman avatar Jul 15 '24 14:07 bentsherman