nextflow icon indicating copy to clipboard operation
nextflow copied to clipboard

Declarative DSL for piping high-volume data between tasks/processes on the same node

Open vinjana opened this issue 3 years ago • 4 comments

Usage scenario

Problem Description

Highly tuned workflows may contain a sequence of processes that are connect by named-pipes, UNIX-pipes or similar (sockets?) to prevent the repeated I/O of large amounts of data to the filesystem. Consider the (almost real-world) Bash example:

mkfifo fastq1_fifo fastq2_fifo
gunzip -c $fastq1 > fastq1_fifo && gzip1Pid=$!
gunzip -c $fastq2 > fastq2_fifo && gzip2Pid=$!

mkfifo qc_fifo1 qc_fifo2
bwa mem ... fastq1_fifo fastq2_fifo \
 | samtools view -b - \
 | tee qc_fifo1 qcfifo2 > output.bam && bwaPid=$!
qc1.pl $qc_fifo1 > qc1.txt && qcPid1=$!
qc2.py $qc_fifo2 > qc2.txt && qcPid2=$!

wait $bwaPid $gzip1Pid $gzip2Pid qcPid1 $qcPid2

Compared to an implementation that does not use pipes this implementation may safe 100s of GB input/output, but it is fragile and wordy, not to speak of problems like that wait only returns the exit code of the first failing process (loss of information for debugging), and that the logs from different processes may be interspersed and hard to read, unless further measures are taken. Furthermore, implementing this correctly and safely in Bash is hard (e.g. remember to set -ue; set -o pipefail; ...). From a maintenance-perspective this code is completely unacceptible -- and this is only a small example. Unfortunately, it is too close to real-life code to be funny.

Proposed NF-based Solution

A solution in NF would be very similar to the pipe() feature in Snakemake. Commands, in different Nextflow tasks should be connectable by pipes using some declarative syntax, or they should be assignable to task-groups that are guaranteed to be executed on the same compute node.

A NF DSL would

  • ... move the implementers focus away from setup (mkfifo), error handling, logging, and multi-processing, to the actual core business of writing a bioinformatic workflow. This also increases the accessibility of multiprocessing for less technical users.
  • ... may allow easy switching between an implementation that produces all intermediate files and runs independent tasks -- for development and debugging -- and a piped implementation for production with ever-increasing NGS file sizes. Conversely, a NF global variable may be useful to turn off all piping and thus switch a failed production call into a workflow with all tasks running separately and thus being more amenable for debugging. Still, with separation of logs of different tasks this might even not be necessary.

From a DSL-perspective this means

  • Add a pipe or similar annotation to allow declaring piped connection between tasks.
  • Allow assigning tasks to groups, possibly doing automatic piping between processes.
  • Maybe come up with another fancy, more explicit DSL. There are many more possible improvements that would support the user in defining more complex pipelines. Think of 1-in-k-out (tee/mbuffer) connectors, k-in-1-out (with guaranteed no overwriting from concurrent processes; similar cat but non-sequential/blocking for parallel inputs) connectors or variable buffer-size (mbuffer).

From a processing perspective this means

  • Keep logs of pipe-connected tasks separate, to ensure that log messages are unambiguously linked to processes (one of the problems of the naïve Bash implementation).
  • Group processes together synchronously on the same compute node. They may integrate via named pipes (or maybe sockets?).
  • What to do if an abnormal process (all software used can be buggy, right), that does not send a "broken pipe" signal via the pipe? In this case an error in one task of the group should result in the (time-delayed?) killing of the other tasks, and this should of course be logged, because then it is possible to understand which task was killed actively, and which failed on itself.

NOTE: This is not a streaming feature like Apache Spark. No data shall be transferred into the Nextflow process but the data shall remain entirely on the compute node. Transferring data between nodes is exactly not what is asked here. This feature is to make the Nextflow DSL usable to declare process pipelines that run completely on the same node.

Suggest implementation

Not sure. One option would be composing processes into a single script (with wrappers to guarantee e.g. log-separation, etc.). The other possibility is maybe to group processes at the level of the scheduler (LSF, SLURM, etc.).

vinjana avatar Dec 02 '20 17:12 vinjana