bionode-watermill icon indicating copy to clipboard operation
bionode-watermill copied to clipboard

Streaming Between Tasks

Open thejmazz opened this issue 7 years ago • 1 comments

In order to be a streaming workflow engine, we need to support the ability to pipe between tasks. This does not add much to the "traditional" pipeline which mostly reads/writes files between tasks, but it can open interesting use cases:

  • stream of query responses from bionode-ncbi can be piped into pipelines
  • ease of incorporating node transform streams into pipelines
  • separate tools (do not need a container with A and B to run A | B for example)
  • cool things with fork, something like: A | fork(B1, B2) | C (however this will be tricky to implement as we will need to create duplicate streams)

Dump from docs:

If either (input or output) is not provided, it will be assumed the task is then a streaming task - i.e., it is a duplex stream with writable and/or readable portions. Consider:

const throughCapitalize = through(function (chunk, env, next) {
  // through = require('through2') - a helper to create through streams
  // takes chunk, its encoding, and a callback to notify when complete pushing
  // push a chunk to the readable portion of this through stream with
  this.push(chunk.toString().toUpperCase())
  // then call next so that next chunk can be handled
  next()

You could connect `capitalize` to a readable (`readFile`) and writable 
(`writeFile`) file 
stream with:

const capitalize = task({
  name: 'Capitalize Through Stream'
},
// Here, input is a readable stream that came from the previous task
// Let's return a through stream that takes the input and capitalizes it
({ input }) => input.pipe(throughCapitalize) )

const readFile = task({
  input: '*.lowercase',
  name: 'Read from *.lowercase'
}, ({ input }) => {
  const rs = fs.createReadStream(input)
  // Add file information to stream object so we have it later
  rs.inFile = input
})

const writeFile = task({
  output: '*.uppercase',
  name: 'Write to *.uppercase'
}, ({ input }) => fs.createWriteStream(input.inFile.swapExt('uppercase')))

// Can now connect the three:
join(readFile, capitalize, writeFile)

Of course, this could be written as one single task. This is somewhat simpler, but the power of splitting up the read, transform, and write portions of a task will become apparent once we can provide multiple sets of parameters to the transform and observe the effect, without having to manually rewire input and output filenames. As a single task the above would become:

const capitalize = task({
  input: '*.lowercase',
  output: '*.uppercase',
  name: 'Capitalize *.lowercase -> *.uppercase'
}, ({ input }) =>
	fs.createReadStream(input)
	.pipe(throughCapitalize)
	.pipe(fs.createWriteStream(input.swapExt('lowercase')))
)

thejmazz avatar Aug 26 '17 21:08 thejmazz

Tied to this issue, we should also be able to pass variables, functions, etc... between tasks and still be able to call them somehow in other downstream tasks.

tiagofilipe12 avatar Sep 19 '17 18:09 tiagofilipe12