nextflow icon indicating copy to clipboard operation
nextflow copied to clipboard

Identity-based provenance tracking

Open pditommaso opened this issue 10 months ago • 13 comments

This PR implements the ability to trace the full provenance of a Nextflow pipeline, so that once a task execution is completed, it reports the set of direct upstream tasks that have originated one or more inputs.

How it works

Each output value that's emitted by a task or an operator is wrapped with an object instance. This makes it possible to assign to each emitted value a unique identity based on the underlying Java object identity.

Each object is associated with the corresponding task or operator run (i.e. TaskRun and OperatorRun).

Once the output value is received as an input by task, the upstream task is determined by inspecting the output-run association table.

Required changes

This approach requires enclosing each output value with a wrapper object, and "unwrap" it once it is received by the downstream task or operator, so that the corresponding operation is not altered.

The input unwrapping can be automated easily both for tasks and operators because they have a common message receive interface.

However the output wrapping requires modifying all nextflow operators because each of them of a custom logic to produce the outputs

Possible problems

It should be assessed the impact of creating an object instance for each output value generated by the workflow execution on the underlying Java heap.

Similarity, keeping a heap reference for each task and operator run may determine memory pressure on large workflow graphs.

Current state and next steps

The current implementation demonstrates that this approach is viable. The solution already supports any tasks and the operators: branch, map, flatMap, collectFile.

Tests are available in this case.

The remaining operators should be added to fully support existing workflow applications.

Alternative solution

A simpler solution is possible using the output file paths as the identity value to track the tasks provenance using a logic very similar to the above proposal.

However, the path approach is limited to the case in which all workflow tasks and operator produce file values. The provenance can be tracked for task having one or more non-file input/output values.

pditommaso avatar Jan 05 '25 13:01 pditommaso

Deploy Preview for nextflow-docs-staging canceled.

Name Link
Latest commit 1cca1ae9203fc153a2297c1a658220a2788187ca
Latest deploy log https://app.netlify.com/projects/nextflow-docs-staging/deploys/686a732d42132b0008d8c1b9

netlify[bot] avatar Jan 05 '25 13:01 netlify[bot]

All green!

pditommaso avatar Jan 14 '25 16:01 pditommaso

Great, I will try to review this week.

bentsherman avatar Jan 14 '25 16:01 bentsherman

An update in the status of this PR. The following operators are fully supported:

  • branch
  • buffer
  • concat
  • collect
  • collectFile
  • combine
  • count
  • distinct
  • filter
  • first
  • flatMap
  • flatten
  • groupTuple
  • ifEmpty
  • join
  • last
  • map
  • max
  • min
  • mix
  • mean
  • multiMap
  • reduce
  • take
  • toList
  • toSortedList
  • unique
  • until
  • sum

The most complex that remain to support are likely the splitter ones.

pditommaso avatar Jan 15 '25 13:01 pditommaso

This is why we need fewer operators 😆

The splitter operators should work similarly to flatMap

bentsherman avatar Jan 15 '25 13:01 bentsherman

I know, I know but they exists

pditommaso avatar Jan 15 '25 13:01 pditommaso

@pditommaso Sorry for the accidental merge. I was reviewing a different pull request and clicked into the wrong tab. Super excited about this PR though. 🎉

kenibrewer avatar Jan 15 '25 19:01 kenibrewer

No pb. I'm going to override it

pditommaso avatar Jan 15 '25 19:01 pditommaso

For posterity, here is an example from fetchngs that would not be captured by file-based tracking:

https://github.com/nf-core/fetchngs/blob/8ec2d934f9301c818d961b1e4fdf7fc79610bdc5/workflows/sra/main.nf#L54-L57

SRA_RUNINFO_TO_FTP outputs a csv file that is split into records using the splitCsv operator. These records are filtered and eventually passed to CUSTOM_SRATOOLSNCBISETTINGS:

https://github.com/nf-core/fetchngs/blob/8ec2d934f9301c818d961b1e4fdf7fc79610bdc5/subworkflows/nf-core/fastq_download_prefetch_fasterqdump_sratools/main.nf#L20

So fetchngs should be a good test case for identity-based tracking.

bentsherman avatar Jan 18 '25 01:01 bentsherman

After a first pass, I feel good about the overall approach. Most of the changes seem to be general cleanup, which is appreciated, and most of the new behavior is isolated into new packages, which should keep it easy to evolve. I only have some minor questions that we can discuss later.

The trickiest part is clearly the operators -- linking inputs to outputs correctly, wrapping/unwrapping values correctly, especially for the scatter and gather operators. I will want to dig into this bit to see if we can simplify anything, but even the current amount of overhead looks acceptable.

It looks like the memory impact will be manageable. The Msg wrapper itself shouldn't cost much, at most a few MB. My main concern was keeping lots of intermediate objects alive, but looks like you avoided this by keeping only the object identity of Msg and not the reference itself. You do have to keep all task runs alive, which in truth I have not stress-tested, but nf-prov is already doing this so no change there.

I'm curious to see how you handle groupTuple... should be similar to buffer/collate, just harder 😆

bentsherman avatar Jan 18 '25 04:01 bentsherman

I ripped the code from nf-prov to generate a mermaid diagram of the task graph using your provenance method.

Your rnaseq-nf toy pipeline works fine:

image

I tried to run against fetchngs, but the run hangs at the very end 😞

You should be able to reproduce it with:

make pack
./build/releases/nextflow-24.11.0-edge-dist run nf-core/fetchngs -r 1.12.0 -profile test,conda --outdir results

bentsherman avatar Jan 18 '25 20:01 bentsherman

Well done. I'll check fetchngs asap

pditommaso avatar Jan 19 '25 20:01 pditommaso

Almost complete. One the following operators needs to be reviewed

  • cross
  • dump
  • merge
  • subscribe
  • transpose

Theres' also a the following non-deterministic error to be investigated:

OperatorImplTest > testFilterWithValue FAILED
    org.spockframework.runtime.SpockTimeoutError at OperatorImplTest.groovy:85

pditommaso avatar Feb 09 '25 15:02 pditommaso