snakemake
snakemake copied to clipboard
Depth first execution.
Snakemake versions:
- 7.24.0 and 8.1.0
Is your feature request related to a problem? Please describe. PROBLEM: I have a pipeline with 2 rules. Download fastq and align. I want to process one sample at a time OR prioritize processing depth first. My pipeline keep processing them breadth first way. (download all fastqs, then starts aligning them). I have read few previously opened issue but i see no change.
Describe the solution you'd like I am more worried I am doing something wrong but maybe depth prioritized option?
ATTEMPTS:
I tried multiple things before I wrote here.
I used following command to launch my workflow.
snakemake --snakefile test.snakemake2 --configfile test.yaml --cores 9 --resource mem_mb=200 --forceall -n --prioritize STARsolo_automatic
Per suggested in
- priority didnt work. this
- using the most updated snakemake didnt help (solver is ILP) this
- resource allocation. I limited 200 total mem and gave 120 to
download_fastq
and 80 toalignment
. - In both 7.24 and 8.10 both working solutions run the same so this solution is not working. Previous issue here
I would appreciate any help or guidance.
Thank you very much for developing and maintaining snakemake.
Best,
T.
snakefile
def get_results():
out_list=[]
for sample in config["samples"]:
out_list.append(f"results/{config['project']}/{sample}/STAR/output/Velocyto/filtered")
return out_list
def gather_all_fastqs(wildcards):
out_list=[]
for srr in config["samples"][wildcards.sample]:
out_list.append(f"raw-data/{wildcards.project}/{wildcards.sample}/{srr}_1.fastq")
out_list.append(f"raw-data/{wildcards.project}/{wildcards.sample}/{srr}_2.fastq")
print(out_list)
return out_list
rule all:
input: get_results()
rule download_fastq:
output:
R1_fastq=temp("raw-data/{project}/{sample}/{SRR}_1.fastq"),
R2_fastq=temp("raw-data/{project}/{sample}/{SRR}_2.fastq")
threads:
8
resources:
mem_mb=120
log:
"raw-data/{project}/{sample}/log.download.{SRR}"
shell:
"""
touch {output.R1_fastq}
touch {output.R2_fastq}
"""
rule STARsolo_automatic:
input:
gather_all_fastqs,
# y="raw-data/{project}/{sample}/fastq_check_done"
output:
directory("results/{project}/{sample}/STAR/output/Velocyto/filtered")
log:
"results/{project}/{sample}/log.STAR.txt"
threads:
8
priority:
50
resources:
mem_mb=80
benchmark:
"results/{project}/{sample}/benchmark.STAR.txt"
shell:
"""
mkdir -p {output}
"""
test.yaml
project: "testproject-001"
samples:
GSM3148577_BC10_TUMOR1:
- "SRR7191904"
- "SRR7191905"
sample2:
- "SRR7191904"
sample3:
- "SRR7191904"
sample4:
- "SRR7191904"
If you want to process one sample at a time use --jobs 1
and it should run STARsolo_automatic
after download_fastq
for each sample. If you use --cores 9
or --jobs 9
and you have less than 9 samples, then snakemake will download all fastqs first and then align them.
If this doesn't help try to simplify your example in such way that people can easily reproduce it.
Hi, you may try to set a disk storage limitation in snakemake?
here is related documents: https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#standard-resources
here is related test: https://github.com/snakemake/snakemake/blob/6175ea380b92db0fa9956c3b9b57d6e911164b74/tests/test_inferred_resources/Snakefile
Dear @dariober, Thank you very much for your suggestion. I think i am getting to the bottom of my problem.
snakemake --snakefile code/velo-db-utils/test.snakemake2 --jobs 1
When i execute it it runs like you suggested. Below you see me actually executing it and dry-running it. I remember trying the jobs 1 and was not getting what i wanted. I think the reason is, in execution it actually runs the way i wanted. However, in dry-run the order is simply rule1 rule1 rule1 > rule2 rule2 rule2. Behaviour is same for v7.24 and v8.1. Then I have an ignorant question, is there a way i can see the order of the execution ? DAG does not show the order, I can see the dry-run but you see the problem below. I also dont see the jobid represent the execution order.
Dear @Hocnonsense , I previously played with disk_mb setting in the resources but got no help with that but maybe I was not paying attention to the order.
Thank you very much both of you.
Toy snakefile
samples=["a","b","c"]
rule all:
input:
expand("results/test/final_result_{data}",data=samples)
rule rule1:
output:
temp("results/test/int_result_{sample}")
threads:
8
resources:
disk_mb=100
priority:
1
shell:
"""
touch {output}
sleep 5
"""
rule rule2:
input:
"results/test/int_result_{sample}"
output:
"results/test/final_result_{sample}"
threads:
8
priority:
50
resources:
disk_mb=100
shell:
"""
touch {output}
sleep 5
"""
execution
$ snakemake --snakefile code/velo-db-utils/test.snakemake2 --jobs 1
Building DAG of jobs...
Using shell: /usr/bin/bash
Provided cores: 1 (use --cores to define parallelism)
Rules claiming more threads will be scaled down.
Job stats:
job count min threads max threads
----- ------- ------------- -------------
all 1 1 1
rule1 3 1 1
rule2 3 1 1
total 7 1 1
Select jobs to execute...
[Tue Jan 9 17:32:48 2024]
rule rule1:
output: results/test/int_result_a
jobid: 2
reason: Missing output files: results/test/int_result_a
wildcards: sample=a
priority: 1
resources: tmpdir=/tmp, disk_mb=100, disk_mib=96
[Tue Jan 9 17:32:53 2024]
Finished job 2.
1 of 7 steps (14%) done
Select jobs to execute...
[Tue Jan 9 17:32:53 2024]
rule rule2:
input: results/test/int_result_a
output: results/test/final_result_a
jobid: 1
reason: Missing output files: results/test/final_result_a; Input files updated by another job: results/test/int_result_a
wildcards: sample=a
priority: 50
resources: tmpdir=/tmp, disk_mb=100, disk_mib=96
[Tue Jan 9 17:32:58 2024]
Finished job 1.
2 of 7 steps (29%) done
Removing temporary output results/test/int_result_a.
Select jobs to execute...
[Tue Jan 9 17:32:58 2024]
rule rule1:
output: results/test/int_result_b
jobid: 4
reason: Missing output files: results/test/int_result_b
wildcards: sample=b
priority: 1
resources: tmpdir=/tmp, disk_mb=100, disk_mib=96
[Tue Jan 9 17:33:03 2024]
Finished job 4.
3 of 7 steps (43%) done
Select jobs to execute...
[Tue Jan 9 17:33:03 2024]
rule rule2:
input: results/test/int_result_b
output: results/test/final_result_b
jobid: 3
reason: Missing output files: results/test/final_result_b; Input files updated by another job: results/test/int_result_b
wildcards: sample=b
priority: 50
resources: tmpdir=/tmp, disk_mb=100, disk_mib=96
[Tue Jan 9 17:33:08 2024]
Finished job 3.
4 of 7 steps (57%) done
Removing temporary output results/test/int_result_b.
Select jobs to execute...
[Tue Jan 9 17:33:08 2024]
rule rule1:
output: results/test/int_result_c
jobid: 6
reason: Missing output files: results/test/int_result_c
wildcards: sample=c
priority: 1
resources: tmpdir=/tmp, disk_mb=100, disk_mib=96
[Tue Jan 9 17:33:13 2024]
Finished job 6.
5 of 7 steps (71%) done
Select jobs to execute...
[Tue Jan 9 17:33:13 2024]
rule rule2:
input: results/test/int_result_c
output: results/test/final_result_c
jobid: 5
reason: Missing output files: results/test/final_result_c; Input files updated by another job: results/test/int_result_c
wildcards: sample=c
priority: 50
resources: tmpdir=/tmp, disk_mb=100, disk_mib=96
[Tue Jan 9 17:33:18 2024]
Finished job 5.
6 of 7 steps (86%) done
Removing temporary output results/test/int_result_c.
Select jobs to execute...
[Tue Jan 9 17:33:18 2024]
localrule all:
input: results/test/final_result_a, results/test/final_result_b, results/test/final_result_c
jobid: 0
reason: Input files updated by another job: results/test/final_result_a, results/test/final_result_c, results/test/final_result_b
resources: tmpdir=/tmp
[Tue Jan 9 17:33:18 2024]
Finished job 0.
7 of 7 steps (100%) done
Complete log: .snakemake/log/2024-01-09T173247.474301.snakemake.log
dry-run
(snakemake2023) $ cat snakeout
Building DAG of jobs...
Job stats:
job count min threads max threads
----- ------- ------------- -------------
all 1 1 1
rule1 3 1 1
rule2 3 1 1
total 7 1 1
[Tue Jan 9 17:35:53 2024]
rule rule1:
output: results/test/int_result_b
jobid: 4
reason: Forced execution
wildcards: sample=b
priority: 1
resources: tmpdir=/tmp, disk_mb=100, disk_mib=96
[Tue Jan 9 17:35:53 2024]
rule rule1:
output: results/test/int_result_c
jobid: 6
reason: Forced execution
wildcards: sample=c
priority: 1
resources: tmpdir=/tmp, disk_mb=100, disk_mib=96
[Tue Jan 9 17:35:53 2024]
rule rule1:
output: results/test/int_result_a
output: results/test/final_result_c
jobid: 5
reason: Input files updated by another job: results/test/int_result_c
wildcards: sample=c
priority: 50
resources: tmpdir=/tmp, disk_mb=100, disk_mib=96
[Tue Jan 9 17:35:53 2024]
rule rule2:
input: results/test/int_result_a
output: results/test/final_result_a
jobid: 1
reason: Input files updated by another job: results/test/int_result_a
wildcards: sample=a
priority: 50
resources: tmpdir=/tmp, disk_mb=100, disk_mib=96
[Tue Jan 9 17:35:53 2024]
rule rule2:
input: results/test/int_result_b
output: results/test/final_result_b
jobid: 3
reason: Input files updated by another job: results/test/int_result_b
wildcards: sample=b
priority: 50
resources: tmpdir=/tmp, disk_mb=100, disk_mib=96
Would remove temporary output results/test/int_result_c
Would remove temporary output results/test/int_result_a
Would remove temporary output results/test/int_result_b
[Tue Jan 9 17:35:53 2024]
localrule all:
input: results/test/final_result_a, results/test/final_result_b, results/test/final_result_c
jobid: 0
reason: Input files updated by another job: results/test/final_result_b, results/test/final_result_a, results/test/final_result_c
resources: tmpdir=/tmp
Job stats:
job count min threads max threads
----- ------- ------------- -------------
all 1 1 1
rule1 3 1 1
rule2 3 1 1
total 7 1 1
Reasons:
(check individual jobs above for details)
forced:
rule1
input files updated by another job:
all, rule2
missing output files:
rule1
This was a dry-run (flag -n). The order of jobs does not reflect the order of execution.
sorry for my last reply, the param disk_mb seems for cluster jobs. However, you can set priority of rule 2 ten times higher than rule 1, and once rule 1 for sample=a is done, rule 2 for sample=a is selected prior any other rule 1
My solution to this was to use input functions to create a "linear" DAG.
If you have a workflow of A->B->C and for e.g. sample_1 and sample_2 to process, you can write an input function for rule A that takes wildcards as args and:
If the sample is the first sample you want to process, returns None/Empty string
If the sample is not the first sample, returns the path to rule C's output for the previous sample.
In this case the input function would return empty for sample_1, and it would return sample_1's rule C output. This will force the DAG to run A->B->C for sample1 before running A->B->C for sample2. This also works for chunking - e.g. in the "header" of my Snakefile I would iterate over the list of final files to make, and I would create sublists of N files (N being the batch size) and assign them to a dict of key:sublist where the key is something I can provide using wildcards in the relevant rule. then it's just a lambda wildcards filesdict[wildcards.something]
to get those files as input for a job and cause batched execution