modules
modules copied to clipboard
[FEATURE] Split DeepVariant process
Is your feature request related to a problem? Please describe
DeepVariant has three different stages, make_example, call_variants and postprocess_variants
Currently all are being called from a single nextflow process, using DeepVariant's wrapper script run_deepvariant - a single command to run all three stages. This feature request is for splitting it up, making it more reasonable to use GPU for "call_variants".
The first stage, make_examples, is really a single-threaded job, but it's split into shards and is called with GNU parallel to use $task.cpus number of jobs.
Describe the solution you'd like
The tool could be split into three processes. The stages have very different resource requirements, and all use comparable amouts of run time.
MAKE_EXAMPLES - can be implemented as a "small" task for a single shard. The number of shards can be configurable in the NF pipeline, and the actual shard ID passed to MAKE_EXAMPLES as a value input.
CALL_VARIANTS - Should collect all tfrecord files from MAKE_EXAMPLES and run a single process instance. The main thing is making it possible to use GPU for this job, but by default it should be a "process_high", as it can alternatively use an arbitrary number of CPUs.
POSTPROCESS_VARIANTS - This appears to be a single threaded job with modest memory requirements, so it could be process_single.
Describe alternatives you've considered
-
Using the existing module, but enable GPU support. The problem is that it wastes a lot of time running make_examples, which can't use GPU. To make make_examples faster one can increase the CPUs, but call_variants doesn't really use much CPU if GPU is enabled.
-
Instead of splitting make_example into separate single-threaded tasks, one could keep using GNU parallel and have a single task with resources process_high. I don't know which one is better.
Additional context
I may develop something like this to use internally. Posting here in case people have suggestions for how to make it better, and if it can be integrated in nf-core.
Cool! I've been thinking about this as well, and halfway implemented it locally.
I think even though splitting make_example into single-threaded tasks might be the most clean solution, I think about people using stageInMode: copy on their clusters. I could copy the BAM-files one time to run on one node (1x48 cores), or copy the BAM-files 48 times to the same node if running single-threaded tasks. It would also possibly send 1000s of jobs to the job scheduler, which might be a bit problematic.
Say you instead want to split the calling into 200 "shards", but do it in only 5 jobs you could do something like:
def start = 0
def end = 200 // "n shards"
def step = 48 // should be deepvariant task.cpus
// Create a shard IDs channel
Channel.from( start..(end - 1) )
.collate( step )
.map { it -> [ it ] }
.set { task }
ch_bam_bai
.combine( task )
.set { ch_make_examples }
MAKE_EXAMPLES ( ch_make_examples, ch_fasta, ch_fai, [[],[]] , end )
To pass the shard IDs for each job together with the bam files to MAKE_EXAMPLES.
ch_make_examples:
[[id:HG002_new, family_id:FAM, paternal_id:XXX, maternal_id:YYY, sex:2, phenotype:1], /media/ssd_4tb/felix/projects/skierfe/work/0a/9af09fbde2fdfdae694cdd1746dd0e/HG002_new.bam, /media/ssd_4tb/felix/projects/skierfe/work/0a/9af09fbde2fdfdae694cdd1746dd0e/HG002_new.bam.csi, /media/ssd_4tb/felix/projects/skierfe/work/46/acb44178ea91c1e87418a09735d69b/1.bed, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47]]
[[id:HG002_new, family_id:FAM, paternal_id:XXX, maternal_id:YYY, sex:2, phenotype:1], /media/ssd_4tb/felix/projects/skierfe/work/0a/9af09fbde2fdfdae694cdd1746dd0e/HG002_new.bam, /media/ssd_4tb/felix/projects/skierfe/work/0a/9af09fbde2fdfdae694cdd1746dd0e/HG002_new.bam.csi, /media/ssd_4tb/felix/projects/skierfe/work/46/acb44178ea91c1e87418a09735d69b/1.bed, [48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95]]
[[id:HG002_new, family_id:FAM, paternal_id:XXX, maternal_id:YYY, sex:2, phenotype:1], /media/ssd_4tb/felix/projects/skierfe/work/0a/9af09fbde2fdfdae694cdd1746dd0e/HG002_new.bam, /media/ssd_4tb/felix/projects/skierfe/work/0a/9af09fbde2fdfdae694cdd1746dd0e/HG002_new.bam.csi, /media/ssd_4tb/felix/projects/skierfe/work/46/acb44178ea91c1e87418a09735d69b/1.bed, [96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143]]
[[id:HG002_new, family_id:FAM, paternal_id:XXX, maternal_id:YYY, sex:2, phenotype:1], /media/ssd_4tb/felix/projects/skierfe/work/0a/9af09fbde2fdfdae694cdd1746dd0e/HG002_new.bam, /media/ssd_4tb/felix/projects/skierfe/work/0a/9af09fbde2fdfdae694cdd1746dd0e/HG002_new.bam.csi, /media/ssd_4tb/felix/projects/skierfe/work/46/acb44178ea91c1e87418a09735d69b/1.bed, [144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191]]
[[id:HG002_new, family_id:FAM, paternal_id:XXX, maternal_id:YYY, sex:2, phenotype:1], /media/ssd_4tb/felix/projects/skierfe/work/0a/9af09fbde2fdfdae694cdd1746dd0e/HG002_new.bam, /media/ssd_4tb/felix/projects/skierfe/work/0a/9af09fbde2fdfdae694cdd1746dd0e/HG002_new.bam.csi, /media/ssd_4tb/felix/projects/skierfe/work/46/acb44178ea91c1e87418a09735d69b/1.bed, [192, 193, 194, 195, 196, 197, 198, 199]]
and make_examples.nf, where ${n_tasks} would be the list of Shard IDs (task) from above:
input:
tuple val(meta), path(input), path(index), path(intervals), val(tasks)
...
def n_tasks = tasks.join(' ')
"""
parallel -j ${task.cpus} --halt 2 --line-buffer eval /opt/deepvariant/bin/make_examples \\
--mode calling \\
--ref ${fasta} \\
--reads ${input} \\
--examples ./make_examples.tfrecord@${end}.gz \\
--add_hp_channel \\
--alt_aligned_pileup diff_channels \\
--gvcf ./gvcf.tfrecord@${end}.gz \\
--max_reads_per_partition "600" \\
--min_mapping_quality 1 \\
--parse_sam_aux_fields \\
--partition_size 25000 \\
--phase_reads \\
--pileup_image_width 199 \\
--norealign_reads \\
--sample_name ${meta.id} \\
--sort_by_haplotypes \\
--track_ref_reads \\
--vsc_min_fraction_indels 0.12 ${regions} --task {} ::: ${n_tasks}
But it feels a bit hacky.
You could also say we only allow for one make_examples (instead of 5 in the example) into call_variants, to make it simpler.
Then we could just set ${n_tasks} to count from 0 to task.cpus-1, and ${end} to task.cpus within the process, skipping the whole channel manipulation. This might be the better option. You could still supply a bed file to split the calling per chromosome, and that might be enough.
Otherwise:
- As of 1.6 I think postprocess_variants is multithreaded.
- I guess we would also have to handle the different model type presets manually within make_examples, and keep track of that when updating the versions.
Thanks for the comments - it's a great point about using stageInMode: copy - this would be very inefficient to have many make_examples jobs. And also like you say it floods the scheduler, but in my case it fills up executor.queueSize first. Then it won't submit any of the calling jobs until most of the make_examples are done, for all samples & intervals.
I think I will change it to have a single make_examples using parallel, to start with. Like you say, it will make things a lot simpler!
I don't know what to do about different models. I'll use the WGS model and the same parameters as are used in the current process, and then try to improve it later.
I've replaced the DEEPVARIANT process with a workflow comprising the three stages. I'm not sure if nf-core allows having a workflow in a module, like that. Here's the current version: https://github.com/fa2k/modules/blob/split-deepvariant/modules/nf-core/deepvariant/main.nf
The dream was to have a drop-in replacement, but that's not possible. Most workflows have custom configuration in conf/modules, that will have to change due to different process names.
Even though the stage-in behaviour is a lot better now than it could be (see above), it still requires to stage in the tfrecord files multiple times, and the previous version didn't need that. (Only relevant if stageInMode: copy).
I'm not sure if this idea is worth continuing, if I should try to polish it up as a new module version and PR. Any comments on whether the benefits outweigh the mentioned downsides?
Looks promising!
I've replaced the DEEPVARIANT process with a workflow comprising the three stages. I'm not sure if nf-core allows having a workflow in a module, like that. Here's the current version: https://github.com/fa2k/modules/blob/split-deepvariant/modules/nf-core/deepvariant/main.nf
Could we rather than replacing the module, add three separate modules (deepvariant/make_examples, deepvariant/call_variants, deepvariant/postprocess_variants), and then make a deepvariant subworkflow to combine the three and use that as a replacement for the current process?
Even though the stage-in behaviour is a lot better now than it could be (see above), it still requires to stage in the tfrecord files multiple times, and the previous version didn't need that. (Only relevant if stageInMode: copy).
I'm not sure how big the tf-record files are (?), and now it's only one process. So it might not be a problem.
I'm not sure if this idea is worth continuing, if I should try to polish it up as a new module version and PR. Any comments on whether the benefits outweigh the mentioned downsides?
I think it would be worth it, DeepVariant is one of the most time and resource consuming processes in the pipelines, and it would be great if we could reduce that. Even though it won't be a drop-in and will require a little bit of work for the pipeline maintainers, if we could have a subworkflow with a test that asserts that the subworkflow produces the same files as the current process that would be great.
Yes making a subworkflow seems more aligned with other things in nf-core, and changes are needed in the workflows no matter what. I'm off this week, and will try to split it like you suggest next week.
I've made a draft solution based on splitting the subcommands of DeepVariant into separate modules. These "subcommands" are the commands seen in the log of the run_deepvariant script, used in the current module. There is a problem with this code (see below), but I add it as a draft PR to show the status:
https://github.com/nf-core/modules/pull/5990
There are three processes:
- DEEPVARIANT_MAKEEXAMPLES
- Inputs: cram / bam; reference genome; intervals
- Outputs: two sets of tfrecords: "examples" and "gvcf". gvcf records are less than 10% the size. The number of tfrecords of each kind is the same as $task.cpus.
- DEEPVARIANT_CALLVARIANTS
- Inputs: examples tfrecords; model name
- Outputs: variant calls tfrecord - single file.
- DEEPVARIANT_POSTPROCESSVARIANTS
- Inputs: variant calls tfrecord; gvcf tfrecords; reference genome
- Outputs: VCF file, GVCF file.
The subworkflow inputs are similar to the current DEEPVARIANT module, but it adds a value input to specify the model, e.g. "wgs". I attempt to avoid unnecessary staging of the gvcf.tfrecord files. These are produced by MAKEEXAMPLES, but are only used for POSTPROCESSVARIANTS, so they are not propagated through CALLVARIANTS. This is why I use a channel join, which admittedly does add some complexity. If you all prefer, the join could be removed by instead passing the gvcf.tfrecord files through CALLVARIANTS.
The subworkflow has to create a unique ID to use for joining the channels for input to POSTPROCESSVARIANTS. The unique ID is the task hash of MAKEEXAMPLES. It's not possible to join on just meta, because pipelines like sarek will have the same meta with multiple different intervals.
Example to use GPU, with singularity:
withName:"DEEPVARIANT_CALLVARIANTS" {
containerOptions = '--nv'
container = '/data/nobackup/nsc/raredisease-configs/singularity/docker.io-google-deepvariant-1.5.0-gpu.img'
cpus = 6
}
Blocking issue - I need some help please:
- There is no way to get intervals.baseName in POSTPROCESSVARIANTS because the intervals are not propagated. They are only needed for MAKEEXAMPLES. Sarek uses "intervals.baseName" as part of the prefix, to have a unique name for each interval's vcf files. It seems a bit ugly to propagate intervals through all the processes when it's not needed, but I could do it. Or sarek could use make_examples_id instead in the name, but it's not as easy to see what it is from the filename - just a long hash.
Limitations:
- Nextflow will submit all the MAKEEXAMPLES jobs it can before any CALLVARIANTS. If there are many jobs and it's limited by executor.queueSize, it means that the GPU won't be utilised until all the MAKEEXAMPLES jobs are submitted to the scheduler. It's likely to run into this in sarek, which splits the samples by intervals, producing many jobs.
I really like what you did. I'd say it's time for really thinking properly on how to do this nicely and propagate it trough the pipelines that need it. I'll have a more in-depths look and will get back to you ASAP
As linked above - I made a new attempt at a pull request, which keeps the existing module as deepvariant/rundeepvariant, and uses the new DeepVariant version (requiring some changes to the intermediate file names). The model type value channel is removed, instead the model type is configured in the config.