bionode-watermill
bionode-watermill copied to clipboard
Streaming Between Tasks
In order to be a streaming workflow engine, we need to support the ability to pipe
between tasks. This does not add much to the "traditional" pipeline which mostly reads/writes files between tasks, but it can open interesting use cases:
- stream of query responses from bionode-ncbi can be piped into pipelines
- ease of incorporating node transform streams into pipelines
- separate tools (do not need a container with
A
andB
to runA | B
for example) - cool things with
fork
, something like:A | fork(B1, B2) | C
(however this will be tricky to implement as we will need to create duplicate streams)
Dump from docs:
If either (input or output) is not provided, it will be assumed the task is then a streaming task - i.e., it is a duplex stream with writable and/or readable portions. Consider:
const throughCapitalize = through(function (chunk, env, next) {
// through = require('through2') - a helper to create through streams
// takes chunk, its encoding, and a callback to notify when complete pushing
// push a chunk to the readable portion of this through stream with
this.push(chunk.toString().toUpperCase())
// then call next so that next chunk can be handled
next()
You could connect `capitalize` to a readable (`readFile`) and writable
(`writeFile`) file
stream with:
const capitalize = task({
name: 'Capitalize Through Stream'
},
// Here, input is a readable stream that came from the previous task
// Let's return a through stream that takes the input and capitalizes it
({ input }) => input.pipe(throughCapitalize) )
const readFile = task({
input: '*.lowercase',
name: 'Read from *.lowercase'
}, ({ input }) => {
const rs = fs.createReadStream(input)
// Add file information to stream object so we have it later
rs.inFile = input
})
const writeFile = task({
output: '*.uppercase',
name: 'Write to *.uppercase'
}, ({ input }) => fs.createWriteStream(input.inFile.swapExt('uppercase')))
// Can now connect the three:
join(readFile, capitalize, writeFile)
Of course, this could be written as one single task. This is somewhat simpler, but the power of splitting up the read, transform, and write portions of a task will become apparent once we can provide multiple sets of parameters to the transform and observe the effect, without having to manually rewire input and output filenames. As a single task the above would become:
const capitalize = task({
input: '*.lowercase',
output: '*.uppercase',
name: 'Capitalize *.lowercase -> *.uppercase'
}, ({ input }) =>
fs.createReadStream(input)
.pipe(throughCapitalize)
.pipe(fs.createWriteStream(input.swapExt('lowercase')))
)
Tied to this issue, we should also be able to pass variables, functions, etc... between tasks and still be able to call them somehow in other downstream tasks.