execa icon indicating copy to clipboard operation
execa copied to clipboard

Observable support with customizable through filters

Open jamestalmage opened this issue 9 years ago • 1 comments

var child = execa('./cli.js', [], {stdio: [null, 'observable', null]});

t.plan(5); // should get 5 lines
await child.stdin
  .filter(line => /^s/.test(line)) // only lines that start with "s"
  .forEach(line => t.true(fitsSpecifiedFormat(line)));

Additionally, it would be cool if you could use DuplexStreams to modify what reaches the observable, I'm thinking "newline-separated-json" byte stream converted to a JSON object stream, etc.

There are a number of "split-stream-XXX" modules already on NPM, but they convert to a stream, would be cool to write object-stream-to-observable, and then use that to make CLI output processing awesome.

jamestalmage avatar Apr 23 '16 18:04 jamestalmage

I like it. I always dread when I have to do anything with Node streams. Observables seem perfect for doing this kind of thing.

sindresorhus avatar Apr 27 '16 09:04 sindresorhus

I am wondering whether the modern way to do this now (as this issue is 7 years old) would be to use async iterables. Both streams (including process.stdin|stdout|stderr and readline.createInterface()) are async iterables. Node.js added some functional methods to streams, including .filter(), .map() and so on.

Based on this, it seems like we might be able to close this issue. What do you think?

ehmicky avatar Dec 18 '23 03:12 ehmicky

Agreed

sindresorhus avatar Dec 18 '23 16:12 sindresorhus

I have been digging into this issue, and it turns out it is actually quite complicated for users to get right.

First, the .filter(), .map() functional methods I was mentioning are only available for Readable streams, so cannot be used for process.stdin.

Another problem with those functional methods is that they do not allow init and flush logic. This is quite problematic. For example, this prevents defining a mapping method that splits the input line by line (as suggested in #121).

Getting error handling right, especially ensuring that an error thrown in the user mapping logic is propagated is difficult. That's because any error will abort streams, and often a generic AbortError will be propagated instead. That error is completely opaque, which makes debugging pretty difficult.

Yet another problem is that this does not work when combined with other stdin/stdout/stderr/stdio option values. One important case is: it is difficult for users to map stdout: 'inherit', since childProcess.stdout would then not be defined.

Other options which could benefit from having this built-in: the maxBuffer and encoding options. For example, if a transform has a big output, it would be beneficial to make the maxBuffer option take this into account.

Another issue is that when manually iterating chunks, those are Buffer instances, which we've been trying to avoid (as opposed to Uint8Array, and which are less convenient to use than string for this specific use case.

Making the mapping performant is important too. When using multiple transforms and one is slower than the others, it is beneficial to have some kind of buffering logic like Node.js streams highWaterMark. Getting this right can be tricky in some instances.

Based on all of the above, I have added a PR at #693 which implements this by using async generators passed to the stdin/stdout/stderr/stdio options.

const transform = async function * (chunks) {
	for await (const chunk of chunks) {
		yield chunk.toUpperCase();
	}
};

const {stdout} = await execa('echo', ['hello'], {stdout: transform});
console.log(stdout); // HELLO

ehmicky avatar Jan 16 '24 02:01 ehmicky

This new feature has now been merged to the main branch. If you're curious, please check its documentation.

To retake the original's message's example, it would look like this instead:

let count = 0

const testLine = function * (line) {
  if (line.startsWith('s')) {
    count += 1
    t.true(fitsSpecifiedFormat(line))
  }
}

await execa('./cli.js', { stdout: testLine })
t.is(count, 5)

Although arguably for a unit test, it would be easier to not use streaming and just do:

const { stdout } = await execa('./cli.js')
const lines = stdout.split('\n')
t.is(lines.length, 5)
t.true(lines.every(line => !line.startsWith('s') || fitsSpecifiedFormat(line))

ehmicky avatar Jan 17 '24 02:01 ehmicky