snakemake icon indicating copy to clipboard operation
snakemake copied to clipboard

Depth first execution.

Open mortunco opened this issue 7 months ago • 5 comments

Snakemake versions:

  • 7.24.0 and 8.1.0

Is your feature request related to a problem? Please describe. PROBLEM: I have a pipeline with 2 rules. Download fastq and align. I want to process one sample at a time OR prioritize processing depth first. My pipeline keep processing them breadth first way. (download all fastqs, then starts aligning them). I have read few previously opened issue but i see no change.

Describe the solution you'd like I am more worried I am doing something wrong but maybe depth prioritized option?

ATTEMPTS: I tried multiple things before I wrote here. I used following command to launch my workflow. snakemake --snakefile test.snakemake2 --configfile test.yaml --cores 9 --resource mem_mb=200 --forceall -n --prioritize STARsolo_automatic

Per suggested in

  • priority didnt work. this
  • using the most updated snakemake didnt help (solver is ILP) this
  • resource allocation. I limited 200 total mem and gave 120 to download_fastq and 80 to alignment.
  • In both 7.24 and 8.10 both working solutions run the same so this solution is not working. Previous issue here

I would appreciate any help or guidance.

Thank you very much for developing and maintaining snakemake.

Best,

T.

snakefile

def get_results():
    out_list=[]
    for sample in config["samples"]:
        out_list.append(f"results/{config['project']}/{sample}/STAR/output/Velocyto/filtered")
    return out_list

def gather_all_fastqs(wildcards):
    out_list=[]
    for srr in config["samples"][wildcards.sample]:
        out_list.append(f"raw-data/{wildcards.project}/{wildcards.sample}/{srr}_1.fastq")
        out_list.append(f"raw-data/{wildcards.project}/{wildcards.sample}/{srr}_2.fastq")
    print(out_list)
    return out_list


rule all:
    input: get_results()
    
rule download_fastq:
    output:
        R1_fastq=temp("raw-data/{project}/{sample}/{SRR}_1.fastq"),
        R2_fastq=temp("raw-data/{project}/{sample}/{SRR}_2.fastq")
    threads:
        8
    resources:
        mem_mb=120
    log:
        "raw-data/{project}/{sample}/log.download.{SRR}"
    shell:
        """
        touch {output.R1_fastq}
        touch {output.R2_fastq}
        """

rule STARsolo_automatic:
    input:
        gather_all_fastqs,
        # y="raw-data/{project}/{sample}/fastq_check_done"
    output: 
        directory("results/{project}/{sample}/STAR/output/Velocyto/filtered")
    log: 
        "results/{project}/{sample}/log.STAR.txt"
    threads:
        8
    priority:
        50
    resources:
        mem_mb=80
    benchmark:
        "results/{project}/{sample}/benchmark.STAR.txt"
    shell:
        """
        mkdir -p {output}
        """

test.yaml

project: "testproject-001"
samples: 
  GSM3148577_BC10_TUMOR1:
  - "SRR7191904"
  - "SRR7191905"
  sample2:
    - "SRR7191904"
  sample3:
    - "SRR7191904"
  sample4:
    - "SRR7191904"

mortunco avatar Jan 08 '24 23:01 mortunco

If you want to process one sample at a time use --jobs 1 and it should run STARsolo_automatic after download_fastq for each sample. If you use --cores 9 or --jobs 9 and you have less than 9 samples, then snakemake will download all fastqs first and then align them.

If this doesn't help try to simplify your example in such way that people can easily reproduce it.

dariober avatar Jan 09 '24 11:01 dariober

Hi, you may try to set a disk storage limitation in snakemake?

here is related documents: https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#standard-resources

here is related test: https://github.com/snakemake/snakemake/blob/6175ea380b92db0fa9956c3b9b57d6e911164b74/tests/test_inferred_resources/Snakefile

Hocnonsense avatar Jan 09 '24 17:01 Hocnonsense

Dear @dariober, Thank you very much for your suggestion. I think i am getting to the bottom of my problem.

snakemake --snakefile code/velo-db-utils/test.snakemake2 --jobs 1

When i execute it it runs like you suggested. Below you see me actually executing it and dry-running it. I remember trying the jobs 1 and was not getting what i wanted. I think the reason is, in execution it actually runs the way i wanted. However, in dry-run the order is simply rule1 rule1 rule1 > rule2 rule2 rule2. Behaviour is same for v7.24 and v8.1. Then I have an ignorant question, is there a way i can see the order of the execution ? DAG does not show the order, I can see the dry-run but you see the problem below. I also dont see the jobid represent the execution order.

Dear @Hocnonsense , I previously played with disk_mb setting in the resources but got no help with that but maybe I was not paying attention to the order.

Thank you very much both of you.

Toy snakefile

samples=["a","b","c"]

rule all:
    input: 
        expand("results/test/final_result_{data}",data=samples)
    
rule rule1:
    output:
        temp("results/test/int_result_{sample}")
    threads:
        8
    resources:
        disk_mb=100
    priority:
        1
    shell:
        """
        touch {output}
        sleep 5 
        """

rule rule2:
    input:
        "results/test/int_result_{sample}"
    output: 
        "results/test/final_result_{sample}"
    threads:
        8
    priority:
        50
    resources:
        disk_mb=100
    shell:
        """
        touch {output}
        sleep 5
        """

execution

$ snakemake --snakefile code/velo-db-utils/test.snakemake2 --jobs 1
Building DAG of jobs...
Using shell: /usr/bin/bash
Provided cores: 1 (use --cores to define parallelism)
Rules claiming more threads will be scaled down.
Job stats:
job      count    min threads    max threads
-----  -------  -------------  -------------
all          1              1              1
rule1        3              1              1
rule2        3              1              1
total        7              1              1

Select jobs to execute...

[Tue Jan  9 17:32:48 2024]
rule rule1:
    output: results/test/int_result_a
    jobid: 2
    reason: Missing output files: results/test/int_result_a
    wildcards: sample=a
    priority: 1
    resources: tmpdir=/tmp, disk_mb=100, disk_mib=96

[Tue Jan  9 17:32:53 2024]
Finished job 2.
1 of 7 steps (14%) done
Select jobs to execute...

[Tue Jan  9 17:32:53 2024]
rule rule2:
    input: results/test/int_result_a
    output: results/test/final_result_a
    jobid: 1
    reason: Missing output files: results/test/final_result_a; Input files updated by another job: results/test/int_result_a
    wildcards: sample=a
    priority: 50
    resources: tmpdir=/tmp, disk_mb=100, disk_mib=96

[Tue Jan  9 17:32:58 2024]
Finished job 1.
2 of 7 steps (29%) done
Removing temporary output results/test/int_result_a.
Select jobs to execute...

[Tue Jan  9 17:32:58 2024]
rule rule1:
    output: results/test/int_result_b
    jobid: 4
    reason: Missing output files: results/test/int_result_b
    wildcards: sample=b
    priority: 1
    resources: tmpdir=/tmp, disk_mb=100, disk_mib=96

[Tue Jan  9 17:33:03 2024]
Finished job 4.
3 of 7 steps (43%) done
Select jobs to execute...

[Tue Jan  9 17:33:03 2024]
rule rule2:
    input: results/test/int_result_b
    output: results/test/final_result_b
    jobid: 3
    reason: Missing output files: results/test/final_result_b; Input files updated by another job: results/test/int_result_b
    wildcards: sample=b
    priority: 50
    resources: tmpdir=/tmp, disk_mb=100, disk_mib=96

[Tue Jan  9 17:33:08 2024]
Finished job 3.
4 of 7 steps (57%) done
Removing temporary output results/test/int_result_b.
Select jobs to execute...

[Tue Jan  9 17:33:08 2024]
rule rule1:
    output: results/test/int_result_c
    jobid: 6
    reason: Missing output files: results/test/int_result_c
    wildcards: sample=c
    priority: 1
    resources: tmpdir=/tmp, disk_mb=100, disk_mib=96

[Tue Jan  9 17:33:13 2024]
Finished job 6.
5 of 7 steps (71%) done
Select jobs to execute...

[Tue Jan  9 17:33:13 2024]
rule rule2:
    input: results/test/int_result_c
    output: results/test/final_result_c
    jobid: 5
    reason: Missing output files: results/test/final_result_c; Input files updated by another job: results/test/int_result_c
    wildcards: sample=c
    priority: 50
    resources: tmpdir=/tmp, disk_mb=100, disk_mib=96

[Tue Jan  9 17:33:18 2024]
Finished job 5.
6 of 7 steps (86%) done
Removing temporary output results/test/int_result_c.
Select jobs to execute...

[Tue Jan  9 17:33:18 2024]
localrule all:
    input: results/test/final_result_a, results/test/final_result_b, results/test/final_result_c
    jobid: 0
    reason: Input files updated by another job: results/test/final_result_a, results/test/final_result_c, results/test/final_result_b
    resources: tmpdir=/tmp

[Tue Jan  9 17:33:18 2024]
Finished job 0.
7 of 7 steps (100%) done
Complete log: .snakemake/log/2024-01-09T173247.474301.snakemake.log

dry-run

(snakemake2023) $ cat snakeout
Building DAG of jobs...
Job stats:
job      count    min threads    max threads
-----  -------  -------------  -------------
all          1              1              1
rule1        3              1              1
rule2        3              1              1
total        7              1              1


[Tue Jan  9 17:35:53 2024]
rule rule1:
    output: results/test/int_result_b
    jobid: 4
    reason: Forced execution
    wildcards: sample=b
    priority: 1
    resources: tmpdir=/tmp, disk_mb=100, disk_mib=96


[Tue Jan  9 17:35:53 2024]
rule rule1:
    output: results/test/int_result_c
    jobid: 6
    reason: Forced execution
    wildcards: sample=c
    priority: 1
    resources: tmpdir=/tmp, disk_mb=100, disk_mib=96


[Tue Jan  9 17:35:53 2024]
rule rule1:
    output: results/test/int_result_a
    output: results/test/final_result_c
    jobid: 5
    reason: Input files updated by another job: results/test/int_result_c
    wildcards: sample=c
    priority: 50
    resources: tmpdir=/tmp, disk_mb=100, disk_mib=96


[Tue Jan  9 17:35:53 2024]
rule rule2:
    input: results/test/int_result_a
    output: results/test/final_result_a
    jobid: 1
    reason: Input files updated by another job: results/test/int_result_a
    wildcards: sample=a
    priority: 50
    resources: tmpdir=/tmp, disk_mb=100, disk_mib=96


[Tue Jan  9 17:35:53 2024]
rule rule2:
    input: results/test/int_result_b
    output: results/test/final_result_b
    jobid: 3
    reason: Input files updated by another job: results/test/int_result_b
    wildcards: sample=b
    priority: 50
    resources: tmpdir=/tmp, disk_mb=100, disk_mib=96

Would remove temporary output results/test/int_result_c
Would remove temporary output results/test/int_result_a
Would remove temporary output results/test/int_result_b

[Tue Jan  9 17:35:53 2024]
localrule all:
    input: results/test/final_result_a, results/test/final_result_b, results/test/final_result_c
    jobid: 0
    reason: Input files updated by another job: results/test/final_result_b, results/test/final_result_a, results/test/final_result_c
    resources: tmpdir=/tmp

Job stats:
job      count    min threads    max threads
-----  -------  -------------  -------------
all          1              1              1
rule1        3              1              1
rule2        3              1              1
total        7              1              1

Reasons:
    (check individual jobs above for details)
    forced:
        rule1
    input files updated by another job:
        all, rule2
    missing output files:
        rule1

This was a dry-run (flag -n). The order of jobs does not reflect the order of execution.

mortunco avatar Jan 09 '24 17:01 mortunco

sorry for my last reply, the param disk_mb seems for cluster jobs. However, you can set priority of rule 2 ten times higher than rule 1, and once rule 1 for sample=a is done, rule 2 for sample=a is selected prior any other rule 1

Hocnonsense avatar Jan 09 '24 17:01 Hocnonsense

My solution to this was to use input functions to create a "linear" DAG.

If you have a workflow of A->B->C and for e.g. sample_1 and sample_2 to process, you can write an input function for rule A that takes wildcards as args and:

If the sample is the first sample you want to process, returns None/Empty string

If the sample is not the first sample, returns the path to rule C's output for the previous sample.

In this case the input function would return empty for sample_1, and it would return sample_1's rule C output. This will force the DAG to run A->B->C for sample1 before running A->B->C for sample2. This also works for chunking - e.g. in the "header" of my Snakefile I would iterate over the list of final files to make, and I would create sublists of N files (N being the batch size) and assign them to a dict of key:sublist where the key is something I can provide using wildcards in the relevant rule. then it's just a lambda wildcards filesdict[wildcards.something] to get those files as input for a job and cause batched execution

lculibrk avatar Jan 12 '24 15:01 lculibrk