execa
execa copied to clipboard
Consider implementing a duplex mode
Basically, a way to make a predictable duplex stream from stdin
and stdout
(or stderr
), so you can use a child process as a transform stream. It'd emit an error if the process errors, and it'd kill the process if the stream is destroyed.
const execa = require('execa');
myStream.pipe(execa.duplex('myFile', ['my', 'arguments'])).pipe(myOtherStream);
It's not terribly difficult to do something like this as execa
is now
const execa = require('execa');
execa.stdout('myFile', ['my', 'arguments'], {input: myStream}).pipe(myOtherStream);
but this can behave in unexpected ways.
// ENOENT gets swallowed, stream completes without error.
execa.stdout('myFiel', ['my', 'arguments'], {input: myStream});
const processStream = execa.stdout('myFile', ['my', 'arguments'], {input: myStream});
// `stdout` is destroyed, but the process doesn't die.
processStream.destroy();
There is a $70.00 open bounty on this issue. Add more on Issuehunt.
- Checkout the Issuehunt explorer to discover more funded issues.
- Need some help from other developers? Add your repositories on Issuehunt to raise funds.
I like it. I could see myself using this in some places.
I would name it duplexStream()
instead of duplex()
. I think that's clearer.
@issuehunt has funded $70.00 to this issue.
- Submit pull request via IssueHunt to receive this reward.
- Want to contribute? Chip in to this issue via IssueHunt.
- Checkout the IssueHunt Issue Explorer to see more funded issues.
- Need help from developers? Add your repository on IssueHunt to raise funds.
I was looking into working on this, but I ran into a small issue.
If this syntax were to work
myStream.pipe(execa.duplex('myFile', ['my', 'arguments'])).pipe(myOtherStream);
we'd have to return a duplex stream:
new stream.Duplex({
write(...writeArgs) {
return spawned.stdin.write(...writeArgs);
},
read(...readArgs) {
return spawned.stdout.read(...readArgs);
}
});
but we can't close this stream once we're done writing the stdin data. How do we work around that?
Maybe use a stream.Passthrough
. I haven't really looked into this issue closely though.
Trying to work on it too. Some questions :
- Should a non-zero exit code emit an error event of the duplex ?
- If the user call duplex.destroy(), should the process be killed, or do we let it run and possibly fail due to broken pipe ?
Rough, mostly untested first try :
https://github.com/sloonz/execa/commit/9708d7ce4e53b3919737e6095455117b37a1c049
I am wondering whether the new pipe methods might already be fixing the underlying problem of this issue?
// stdout -> stdin
await execa(...).pipeStdout(execa(...))
// stdout -> stream
await execa(...).pipeStdout(otherStream)
// stdout -> file
await execa(...).pipeStdout('path');
// stderr -> stdin/stream/file
await execa(...).pipeStderr(...);
// stdout + stderr -> stdin/stream/file
await execa(...).pipeAll(...);
// stream -> stdin
await execa(..., {stdin: stream})
// file -> stdin
await execa(..., {inputFile: 'path'})
// string -> stdin
await execa(..., {input: 'value'})
So it is actually quite easy now to pipe multiple processes:
const { stdout: topTenFilenames } = await execa('ls', ['exampleDir'])
.pipeStdout(execa('sort'))
.pipeStdout(execa('head', ['-n', '10']))
If anyone wants to work on this, see the feedback in: https://github.com/sindresorhus/execa/pull/424
It's taken me quite some time, but I finally got this working. I have a PR at #912 which implements this, together with .readable()
and .writable()
methods.
Motivation
The purpose is to use a subprocess as a stream, to pass it to APIs which accept streams. This contrasts with:
- The
std*: stream
options, which do the opposite, i.e. passing a stream to a subprocess - The
.pipe()
method, which only focuses on subprocess piping, and does it well
Readable vs Writable vs Duplex
Users might want to only use the readable/writable side of the subprocess, but still convert it to a stream. Using subprocess.stdin
or subprocess.stdout
works in some cases, but not if the user wants to:
- Wait for the subprocess to complete
- Propagate any error from the subprocess to the stream, and the other way around
If a user wants a read-only stream, it is improper to return a Duplex
with an already ended writable side. Same for write-only streams. That's because the Node.js stream API has some subtle logic meant for Duplex
only. For example, finished(duplex)
(unlike finished(readable)
and finished(writable)
) does not necessarily wait for the close
event and might be missing errors thrown during stream._destroy()
. That seems like a minor thing, but because finished()
is used everywhere by the Node.js stream API, this leads to many subtle bugs when using a Duplex
as if it were a Readable
or Writable
.
I actually started implementing this as a single .duplex()
method, but it became quite clear the proper way to do this while respecting the Node.js streams API's inner workings was to expose 3 different methods: readable, writable, duplex.
const streamOne = subprocess.readable()
const streamTwo = subprocess.writable()
const streamThree = subprocess.duplex()
This also means implementing this as a PassThrough
does not work, since those are duplexes. That was my first implementation attempt, and I had to backtrack from that approach.
This also means Duplex.from()
cannot be used since it returns duplexes. I also made an implementation attempt with this, and it just did not work well.
File descriptors
Users should be able to choose the file descriptor. My PR does it by re-using the from
and to
option from the .pipe()
method. It works for any file descriptor, not only stdin
/stdout
/stderr
. all
can be specified, to get from both stdout
and stderr
. For example:
const stream = subprocess.readable({from: 'stderr'})
Method vs property
Exposing this as a property (as I previously suggested in https://github.com/sindresorhus/execa/issues/591) does not work because:
- This does not allow choosing the file descriptor (as described above)
- If the user does not use that property, the stream will still pull data and propagate errors, which might break things for existing users, and result in a waste of resources
So it needs to be called on-demand via a method.
Error propagation
The stream awaits the subprocess completion. If the subprocess fails, an error
event is emitted on the stream. This means users do not need to await subprocess
anymore as long as they await
the stream (using methods like text()
, finished()
, pipeline()
, etc.).
In the other direction, the subprocess does not wait for the stream: there is no need to. However, if the subprocess takes some input, its stdin
needs to be ended either through the stream or through subprocess.stdin
.
If the stream errors and/or is destroyed, it destroys subprocess.stdin
or subprocess.stdout|stderr
. It does not terminate the subprocess via a signal. Doing so would be improper as it prevents graceful exit. Just like we do with the .pipe()
method, we follow the way shells behave in that matter, which is to just close the subprocess' stdin
or stdout
or let the subprocess end on its own. If the subprocess uses stdin
, ending stdin
will make the subprocess end gracefully. If the subprocess writes to stdout
, ending stdout
will create EPIPE
errors, making the subprocess end gracefully.
For duplexes, we purposely do not propagate errors between the readable and the writable side. There is no reason to, and it creates race conditions and subtle bugs.
Multiple consumers
Using subprocess.readable()
should not make the Execa's return value's result.stdout
or result.stderr
become empty. This was initially the case due to get-stream
not supporting multiple consumers, so this required some major changes to that library.
Also, the PR allows for calling subprocess.readable()
, subprocess.writable()
and subprocess.duplex()
more than once, on either the same file descriptor or different ones.
Hard parts
Solving this issue was very difficult. I tried many approaches and was close to giving up a few times, especially with tests that failed in CI but not locally due to hard-to-debug race conditions. I am pretty confident with the current PR though as it has lots of automated tests covering edge cases.
Some of the things that made this hard to implement:
- Cannot re-use
Duplex.from()
,PassThrough
, piping. In other words, customReadable
/Writable
/Duplex
classes must be defined with their own implementation of_write
/_read
/_final
/_destroy
. - Needs to work in object mode, since users can now make
subprocess.std*
that are in object mode when using transforms - Needs to respect stream
highWaterMark
and buffering behavior -
.readable()
needs not to buffer too much and automatically pause/resumesubprocess.stdout
when its buffer gets full - Needs to work with the
autoDestroy
option, which tends to propagate state when not wanted -
subprocess.stdin
,subprocess.stdout
andsubprocess.stderr
are weird. By default, those 3 are actually duplex TCP sockets.subprocess.stdin
is writable and not readable, butsubprocess.stdout
/subprocess.stderr
are both writable and readable.
Web streams
My PR is implementing this with Node.js streams. All of this work should be done for web streams too. I have opened https://github.com/sindresorhus/execa/issues/913 to track this.
I believe we should let users choose whether they want a Node.js stream or a web stream. While Node.js streams are legacy, most Node modules still only accept them. Also, from having worked with both a lot, I am expecting the implementing with web streams to be more complicated, slower and less stable. Finally, there a few things that only Node.js streams can do, such as having multiple readers at once, where there might be the only option in specific cases.
That being said, we definitely should implement this with for web streams too.
Method names
Because of the above, my PR uses the following names, which are the same names as the stream class itself and the TypeScript type:
-
.readable()
,.writable()
,.duplex()
: Node.js streams -
.readableStream()
,.writableStream()
,.transformTransform()
: web streams
This is impressive research and work. I'm pretty sure I would have given up long before you. 👏
I wonder if there is anything Node.js could improve that would have made this work easier. If so, definitely open some Node.js issues. For example, a version of Duplex.from()
that can be reused.
One of the issues with Duplex.from()
is that it is not designed to create read-only or write-only streams. It is possible to do it, but it is still a Duplex
under-the-hood, which creates a few issues. But I think that's by design.
Also it propagates state changes between the write and read side. In most cases, this is good, but this was such a problem with this specific issue.
The problem with streams is that they combine (by their own nature) multiple things that lead to complexity: performance-sensitive, memory-sensitive, statefulness, async, extendable by users. Then this issue combines it with OS processes, which are complex too. :)
@sindresorhus has rewarded $63.00 to @ehmicky. See it on IssueHunt
- :moneybag: Total deposit: $70.00
- :tada: Repository reward(0%): $0.00
- :wrench: Service fee(10%): $7.00