nextflow
nextflow copied to clipboard
Identity-based provenance tracking
This PR implements the ability to trace the full provenance of a Nextflow pipeline, so that once a task execution is completed, it reports the set of direct upstream tasks that have originated one or more inputs.
How it works
Each output value that's emitted by a task or an operator is wrapped with an object instance. This makes it possible to assign to each emitted value a unique identity based on the underlying Java object identity.
Each object is associated with the corresponding task or operator run (i.e. TaskRun and OperatorRun).
Once the output value is received as an input by task, the upstream task is determined by inspecting the output-run association table.
Required changes
This approach requires enclosing each output value with a wrapper object, and "unwrap" it once it is received by the downstream task or operator, so that the corresponding operation is not altered.
The input unwrapping can be automated easily both for tasks and operators because they have a common message receive interface.
However the output wrapping requires modifying all nextflow operators because each of them of a custom logic to produce the outputs
Possible problems
It should be assessed the impact of creating an object instance for each output value generated by the workflow execution on the underlying Java heap.
Similarity, keeping a heap reference for each task and operator run may determine memory pressure on large workflow graphs.
Current state and next steps
The current implementation demonstrates that this approach is viable. The solution already supports any tasks and the operators: branch, map, flatMap, collectFile.
Tests are available in this case.
The remaining operators should be added to fully support existing workflow applications.
Alternative solution
A simpler solution is possible using the output file paths as the identity value to track the tasks provenance using a logic very similar to the above proposal.
However, the path approach is limited to the case in which all workflow tasks and operator produce file values. The provenance can be tracked for task having one or more non-file input/output values.
Deploy Preview for nextflow-docs-staging canceled.
| Name | Link |
|---|---|
| Latest commit | 1cca1ae9203fc153a2297c1a658220a2788187ca |
| Latest deploy log | https://app.netlify.com/projects/nextflow-docs-staging/deploys/686a732d42132b0008d8c1b9 |
All green!
Great, I will try to review this week.
An update in the status of this PR. The following operators are fully supported:
- branch
- buffer
- concat
- collect
- collectFile
- combine
- count
- distinct
- filter
- first
- flatMap
- flatten
- groupTuple
- ifEmpty
- join
- last
- map
- max
- min
- mix
- mean
- multiMap
- reduce
- take
- toList
- toSortedList
- unique
- until
- sum
The most complex that remain to support are likely the splitter ones.
This is why we need fewer operators 😆
The splitter operators should work similarly to flatMap
I know, I know but they exists
@pditommaso Sorry for the accidental merge. I was reviewing a different pull request and clicked into the wrong tab. Super excited about this PR though. 🎉
No pb. I'm going to override it
For posterity, here is an example from fetchngs that would not be captured by file-based tracking:
https://github.com/nf-core/fetchngs/blob/8ec2d934f9301c818d961b1e4fdf7fc79610bdc5/workflows/sra/main.nf#L54-L57
SRA_RUNINFO_TO_FTP outputs a csv file that is split into records using the splitCsv operator. These records are filtered and eventually passed to CUSTOM_SRATOOLSNCBISETTINGS:
https://github.com/nf-core/fetchngs/blob/8ec2d934f9301c818d961b1e4fdf7fc79610bdc5/subworkflows/nf-core/fastq_download_prefetch_fasterqdump_sratools/main.nf#L20
So fetchngs should be a good test case for identity-based tracking.
After a first pass, I feel good about the overall approach. Most of the changes seem to be general cleanup, which is appreciated, and most of the new behavior is isolated into new packages, which should keep it easy to evolve. I only have some minor questions that we can discuss later.
The trickiest part is clearly the operators -- linking inputs to outputs correctly, wrapping/unwrapping values correctly, especially for the scatter and gather operators. I will want to dig into this bit to see if we can simplify anything, but even the current amount of overhead looks acceptable.
It looks like the memory impact will be manageable. The Msg wrapper itself shouldn't cost much, at most a few MB. My main concern was keeping lots of intermediate objects alive, but looks like you avoided this by keeping only the object identity of Msg and not the reference itself. You do have to keep all task runs alive, which in truth I have not stress-tested, but nf-prov is already doing this so no change there.
I'm curious to see how you handle groupTuple... should be similar to buffer/collate, just harder 😆
I ripped the code from nf-prov to generate a mermaid diagram of the task graph using your provenance method.
Your rnaseq-nf toy pipeline works fine:
I tried to run against fetchngs, but the run hangs at the very end 😞
You should be able to reproduce it with:
make pack
./build/releases/nextflow-24.11.0-edge-dist run nf-core/fetchngs -r 1.12.0 -profile test,conda --outdir results
Well done. I'll check fetchngs asap
Almost complete. One the following operators needs to be reviewed
- cross
- dump
- merge
- subscribe
- transpose
Theres' also a the following non-deterministic error to be investigated:
OperatorImplTest > testFilterWithValue FAILED
org.spockframework.runtime.SpockTimeoutError at OperatorImplTest.groovy:85