flowsaber
flowsaber copied to clipboard
Dataflow based workflow framework
A dataflow based workflow framework.
work in progress
Features
- Intutive syntax: Dataflow-like flow/task composing syntax similar to function call.
- Inspired from
nextflow's DSL2.
- Inspired from
- Pure python: No DSL, Import/Compose/Modify Task/Flow python objects at will.
- Extensible and interactive due to dynamic nature of Python.
- Task Cache.
- ...
- Extensible and interactive due to dynamic nature of Python.
- Concurrent: Task runs implicitly parallel in asyncio event loop.
- Distributable: Use Dask distributed as Task executor, can deploy in local, cluster, cloud.
- Hybrid execution model.
- Build Flow in Local python or web UI.
- Schedule/Monitor flow execution in remote server through python or web UI.
Web UI
Install
pip install flowsaber
Example
- A minimal working example consists most features and usages of
flowsaber.
from flowsaber.api import *
@task
def add(self, num): # self is optional
return num + 1
@task
def multiply(num1, num2):
return num1 * num2
@shell
def write(num):
"""echo {num} > {num}.txt"""
return '*.txt'
@task
def read(f: File):
return open(str(f)).readlines()
@flow
def sub_flow(num):
return add(num) | map_(lambda x: x ** 2) | add
@flow
def my_flow(num):
[sub_flow(num), sub_flow(num)] | multiply \
| write | read | flatten \
| map_(lambda x: int(x.strip())) \
| view
num_ch = Channel.values(1, 2, 3, 4, 5, 6, 7, 8)
# resolve dependencies
workflow = my_flow(num=num_ch)
run(workflow)
Example 2
This is a bioinformatics workflow, rewrite verion of snakemake tutorial
from flowsaber.api import *
@shell
def bwa(self, fa: File, fastq: File): # input will be automatically converted if has type annotation
"""bwa mem -t {self.config.cpu} {fa} {fastq} | samtools view -Sb - > {fastq.stem}.bam"""
return "*.bam" # for ShellTask, str variable in the return will be treated as File and globed
@shell
def sort(bam: File): # self is optional in case you don't want to access the current task
"""samtools sort -o {sorted_bam} {bam}"""
sorted_bam = f"{bam.stem}.sorted.bam"
return sorted_bam
@shell(publish_dirs=["results/vcf"])
def call(fa: File, bams: list): # In case you need to write some python codes
"""samtools mpileup -g -f {fa} {bam_files} | bcftools call -mv - > all.vcf"""
bam_files = ' '.join(str(bam) for bam in bams)
return "all.vcf"
@task
def stats(vcf: File):
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from pysam import VariantFile
quals = [record.qual for record in VariantFile(str(vcf))]
plt.hist(quals)
plt.savefig("report.svg")
@flow
def call_vcf_flow():
def _call(bams): # task is normal function, use python as wish
return call(fa, bams)
context = flowsaber.context
fa = Channel.value(context.fa)
fastq = Channel.values(*context.fastq)
bam1 = bwa(fa, fastq) # automatically clone channel
bam2 = bwa(fa, fastq)
mix(bam1, bam2) | sort | collect | _call | stats
prefix = 'tests/test_flow/snamke-demo.nosync/data'
with flowsaber.context({
"fa": f'{prefix}/genome.fa',
"fastq": [f'{prefix}/samples/{sample}' for sample in ['A.fastq', 'B.fastq', 'C.fastq']]
}):
# resolve dependency
workflow = call_vcf_flow()
run(workflow)
Example to run in remote
Both server and agent need to be run in background before submitting flowruns.
Start server(API endpoint)
In bash shell.
flowsaber server
Start agent(Flow dispatcher)
In bash shell.
flowsaber agent --server "http://127.0.0.1:8000" --id test
Create flow and schedule for running
In python script or IPython console.
from flowsaber.api import *
@task
def add(num):
print("This is meesage send by print to stdout in task")
print("This is meesage send by print to stderr in task", file= sys.stderr)
a = 1
for i in range(10000000):
a += 1
return num + 1
@flow
def myflow(num):
return num | add | add | view | add | view
num_ch = Channel.values(*list(range(10)))
f = myflow(num_ch)
run(f, server_address="http://127.0.0.1:8000", agent_id="test")
Test
python -m pytest tests -s -o log_cli=True -vvvv
TODO
- [ ] Pbs/Torque executor
- [ ] More cache mode.
- [ ] Supportrun in Cloud platform.
- [ ] Run CWL script, Convert between CWL and flowsaber flow.