snakemake-wrappers icon indicating copy to clipboard operation
snakemake-wrappers copied to clipboard

Add a joint-wrapper which parallelizes SAMTOOLS MPILEUP/VARSCAN MPILEUP2SNP

Open moldach opened this issue 3 years ago • 7 comments

There are currently two separate wrappers for SAMTOOLS MPILEUP and VARSCAN MPILEUP2SNP

These tools are used sequentially and unfortunately single-threaded.

I'm in the process of converting shell commands to wrappers so I have not had a chance to benchmark these wrappers specifically; however, I assume it is the same as piping the output of samtools mpileup into varscan, e.g.:

samtools mpileup -f $ref ${array_bam[$j]{"bam"}} |\\
    java -jar /home/${user}/projects/def-mtarailo/common/tools/VarScan.v2.3.9.jar pileup2snp \\
    --variants \\
    --min-coverage $mincov \\
    --min-avg-qual $minqual \\
    --min-var-freq $minfreq > ${path}/calling/varscan/${array_bam[$j]{"id"}}_snp_varscan.vcf ;

When I initially benchmarked the above code on C. elegans it took 101 minutes.

It would be ideal to create a joint-wrapper, combining the two tools, taking advantage of samtools mpileup's --region parameter and GNU Parallel.

The shell command I'm currently using is:

rule variant_calling:
    input:
        ref = os.path.join(dirs_dict["REF_DIR"],config["REF_GENOME"]),
        bam = lambda wildcards: getDeduppedBams(wildcards.sample),
        bam_index = lambda wildcards: getDeduppedBamsIndex(wildcards.sample)
    output:
        os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_sorted_dedupped_snp_varscan.vcf")
    log: os.path.join(dirs_dict["LOG_DIR"],config["CALLING_TOOL"],"{sample}.log")
    resources:
        mem = 10000,
        time = 90
    threads: 7
    params:
        sample = "{sample}"
    message: """--- Varscan pileup2snp."""
    shell: """
        module load samtools;
        module load java/13.0.1;
        echo -n "I II III IV V X MtDNA" |\
        xargs -d " " -n 1 -P 7 -I {{}} /bin/bash -c \
        "samtools mpileup -r {{}} \
        -f ~/projects/def-mtarailo/common/indexes/WS265_wormbase/{{}}.fa \
        {input.bam} |\
        java -Xmx5G -jar ~/projects/def-mtarailo/common/tools/VarScan.v2.3.9.jar pileup2snp \
        --variants \
        --min-coverage 5 \
        --min-avg-qual 30 \
        --min-var-freq 0.9 > {params.sample}_{{}}.vcf"
        awk 'FNR==1 && NR!=1 {{ while (/^<header>/) getline; }} 1 {{print}} ' *.vcf > {output}

        rm {params.sample}_I.vcf {params.sample}_II.vcf {params.sample}_III.vcf {params.sample}_IV.vcf {params.sample}_V.vcf {params.sample}_X.vcf {params.sample}_MtDNA.vcf
        """

This parallelized the variant calling process by applying these operations to each chromosome (on a separate core) reducing computation time to 17 minutes - A 81% decrease in processing time for the lengthiest step in the C. elegans pipeline.

moldach avatar Aug 25 '20 20:08 moldach

Hi @moldach ,

seems like parallelizing over chromosomes is really an enhancement here. Did you think about using the chromosomes as a wildcard? Then, snakemake would run every chromosome in a different job, without the need of xargs and the likes.

Additionally, you could also pipe the output from samtools mpileup to varscan pileup2snp between rules, so you can actually use a separate rule for each of those two steps.

jafors avatar Aug 26 '20 07:08 jafors

Hi @jafors thanks for getting back to me.

I've tried your suggestion of pipe the output from samtools mpileup to varscan pileup2snp between rules and adding another rule to combine the output of all these

rule mpilup_I:
    input:
	bam=lambda wildcards: getDeduppedBams(wildcards.sample),
        reference_genome=os.path.join(dirs_dict["REF_DIR"],config["REF_GENOME"])
    output:
	os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_I.mpileup.gz")
    log:
        os.path.join(dirs_dict["LOG_DIR"],config["CALLING_TOOL"],"{sample}_I_samtools_mpileup.log")
    params:
	extra="-r I",  # optional
    resources:
	mem = 1000,
        time = 30
    wrapper:
        "0.65.0/bio/samtools/mpileup"

rule mpileup_to_vcf_I:
    input:
	os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_I.mpileup.gz")
    output:
	os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_I.vcf")
    message:
	"Calling SNP with Varscan2"
    threads:
	2 # Keep threading value to one for unzipped mpileup input
          # Set it to two for zipped mipileup files
    log:
        os.path.join(dirs_dict["LOG_DIR"],config["CALLING_TOOL"],"varscan_{sample}_I.log")
    resources:
	mem = 1000,
        time = 30
    wrapper:
	"0.65.0/bio/varscan/mpileup2snp"

  ...

and then combine the output of these rules at the end

rule vcf_merge:
    input:
        I = os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_I.vcf"),
        II = os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_II.vcf"),
        III = os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_III.vcf"),
        IV = os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_IV.vcf"),
	V = os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_V.vcf"),
        X = os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_X.vcf"),
        MtDNA = os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_MtDNA.vcf")
    output:
        os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_sorted_dedupped_snp_varscan.vcf")
    log: os.path.join(dirs_dict["LOG_DIR"],config["CALLING_TOOL"],"{sample}_vcf-merge.log")
    resources:
        mem = 1000,
	time = 10
    threads: 1
    message: """--- Merge VarScan by Chromosome."""
    shell: """
	awk 'FNR==1 && NR!=1 {{ while (/^<header>/) getline; }} 1 {{print}} ' {input.I} {input.II} {input.III} {input.IV} {input.V} {input.X} {input.MtDNA} > {output}
        """

We don't have to list the temporary files in rule all - just the final merged .vcf:

rule all:
    input:
        expand('{CALLING_DIR}/{CALLING_TOOL}/{sample}_sorted_dedupped_snp_varscan.{ext}', CALLING_DIR=dirs_dict["CALLING_DIR"], CALLING_TOOL=config["CALLING_TOOL"], sample=sample_names, ext=['vcf']),

For C. elegans it produces a DAG, like:

(snakemake) [moldach@arc wrappers]$ snakemake --use-conda --profile slurm --jobs 13 
Building DAG of jobs...
Using shell: /usr/bin/bash
Provided cluster nodes: 13
Job counts:
	count	jobs
	1	all
	1	mpileup_to_vcf_I
	1	mpileup_to_vcf_II
	1	mpileup_to_vcf_III
	1	mpileup_to_vcf_IV
	1	mpileup_to_vcf_MtDNA
	1	mpileup_to_vcf_V
	1	mpileup_to_vcf_X
	1	mpilup_I
	1	mpilup_II
	1	mpilup_III
	1	mpilup_IV
	1	mpilup_MtDNA
	1	mpilup_V
	1	mpilup_X
	1	vcf_merge
	16

However, forever the Human genome this would require many more rules.

Is there a way to do this more succinctly?

moldach avatar Sep 15 '20 16:09 moldach

There is indeed a way to improve on this when you replace the specific contigs with a wildcard. You can expand in the merging rule:

input:
   variants=expand(os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"], "{{sample}}_{contig}.vcf"), contig=["I", "II"]) # and so on
output:
    os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_sorted_dedupped_snp_varscan.vcf")

Then, you need the other mpileup and mpileup_to_vcf only once, replacing the "I" with {contig}. In the params of rule mpileup, you can do:

params:
    extra=lambda wc: "-r {}".format(wc.contig)

jafors avatar Sep 15 '20 18:09 jafors

Thanks for the suggestion.

I've tried the following but get an error:

rule mpilup:
    input:
        bam=lambda wildcards: getDeduppedBams(wildcards.sample),
        reference_genome=os.path.join(dirs_dict["REF_DIR"],config["REF_GENOME"])
    output:
        expand(os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_{contig}.mpileup.gz"), sample=sample_names, contig=["I","II","III","IV","V","X","MtDNA"])
    log:
        expand(os.path.join(dirs_dict["LOG_DIR"],config["CALLING_TOOL"],"{sample}_{contig}_samtools_mpileup.log"), sample=sample_names, contig=["I","II","III","IV","V","X","MtDNA"])
    params:
        extra=lambda wc: "-r {}".format(wc.contig)
    resources:
        mem = 1000,
        time = 30
    wrapper:
        "0.65.0/bio/samtools/mpileup"

rule mpileup_to_vcf:
    input:
        expand(os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_{contig}.mpileup.gz"), sample=sample_names, contig=["I","II","III","IV","V","X","MtDNA"])
    output:
        expand(os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_{contig}.vcf"), sample=sample_names, contig=["I","II","III","IV","V","X","MtDNA"])
    message:
        "Calling SNP with Varscan2"
    threads:
        2 # Keep threading value to one for unzipped mpileup input
          # Set it to two for zipped mipileup files
    log:
        expand(os.path.join(dirs_dict["LOG_DIR"],config["CALLING_TOOL"],"varscan_{sample}_{contig}.log"), sample=sample_names, contig=["I","II","III","IV","V","X","MtDNA"])
    resources:
        mem = 1000,
        time = 30
    wrapper:
        "0.65.0/bio/varscan/mpileup2snp"

Error

(snakemake) [moldach@arc wrappers]$ snakemake -n -r
Workflow defines that rule get_vep_cache is eligible for caching between workflows (use the --cache argument to enable this).
Building DAG of jobs...
InputFunctionException in line 341 of /home/moldach/wrappers/Snakefile:
Error:
  AttributeError: 'Wildcards' object has no attribute 'sample'
Wildcards:

Traceback:
  File "/home/moldach/wrappers/Snakefile", line 343, in <lambda>

Any idea what could be wrong?

moldach avatar Sep 16 '20 03:09 moldach

vim +341 Snakefile takes me to:

rule mpilup:
    input:
        bam=lambda wildcards: getDeduppedBams(wildcards.sample),
        reference_genome=os.path.join(dirs_dict["REF_DIR"],config["REF_GENOME"])
    output:
        expand(os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_{contig}.mpileup.gz"), sample=sample_names, contig=["I","II","III","IV","V","X","MtDNA"])
    log:
        expand(os.path.join(dirs_dict["LOG_DIR"],config["CALLING_TOOL"],"{sample}_{contig}_samtools_mpileup.log"), sample=sample_names, contig=["I","II","III","IV","V","X","MtDNA"])
    params:
        extra=lambda wc: "-r {}".format(wc.contig)
    resources:
        mem = 1000,
        time = 30
    wrapper:
        "0.65.0/bio/samtools/mpileup"

moldach avatar Sep 16 '20 03:09 moldach

There is no need to expand in the two mpileup-related rules. Those rules will run once for all contigs over all samples. Just write

rule mpileup_to_vcf:
    input:
        os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_{contig}.mpileup.gz")
    output:
        os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_{contig}.vcf")

and the contigs will be determined by the expand() in rule vcf_merge similar to the samples which are determined by the expand() in rule all. In rule mpileup, you do the same and remove the expand from the output and the log fields.

jafors avatar Sep 16 '20 13:09 jafors

Hi @jafors thank you very much for you help. This works wonderfully.

However, I've noticed that the output of these two rules creates downstream issues with another rule #167

moldach avatar Sep 16 '20 18:09 moldach