Stdout/stderr pause/resume
There is no way to pause/resume the standard output/error when the NuProcessHandler cannot handle the buffers.
It would be a useful addition for the Vert.x 3 integration https://github.com/vietj/vertx-childprocess
Are there plans to implement such feature ?
@vietj I haven't tried it, but this should work...
If you simply don't read any data when you are called back, the pipe will fill-up and the child process itself will block. You would need to keep track of the fact that you have pending reads, and read the ByteBuffer to create room in the pipe. That will unblock the child process.
Let me know if it works.
ok, you mean I leave the byte buffer untouched. I will let you know.
so that would work to pause the stream, however how do I unpause it and request data again ? i.e how do I get a new callback to drain the pipe ?
You won't get called back again. Presumably you somehow know when you want to pause and resume, so your callback will need to retain a reference to the ByteBuffer and you will need some other thread or timer to drain the buffer.
It's not exactly clear to me why vertx would need to suspend a stream in the first place...
I see, that sounds doable and I will try.
Vert.x would do that for propagating flow control to another stream : you can read that short part of the doc http://vertx.io/docs/vertx-core/java/#streams
Hi,
I'm looking further at this and the javadoc says You do not own the ByteBuffer provided to you. You should not retain a reference to this buffer. which seems contradictory with retaining a reference to the ByteBuffer.
@vietj Opps, I apologize, the JavaDoc is out of date. It used to be that buffers were reused across processes, but after refactors by @bhamiltoncx this is no longer true. Each process instance has its own stdout, stderr, and stdin ByteBuffer instances.
@brettwooldridge Does NuProcess have any plan to implement Reactive Streams?
@brettwooldridge how can we ensure that the buffer is not modified concurrently by NuProcess eventLoop and a possible drain by another thread ?
@brettwooldridge also now when I access the buffer outside NuProcess event loop I get buffer.remaining() == 0 (the buffer is full) which means the buffer was flipped so when using outside should we flip again this buffer ?
@pfxuan This is the first that I've heard of the Reactive Streams initiative, but NuProcess would certainly welcome any contribution. If someone were to make such an effort, I would recommend the implementation go into a com.zaxxer.nuprocess.streams or com.zaxxer.nuprocess.reactive package.
@vietj Can you post your NuProcessHandler implementation, either here or linked as a gist?
@brettwooldridge Sounds a good plan! If @vietj can do this, the integration is going to become very natural: http://vertx.io/docs/vertx-reactive-streams/java/
@brettwooldridge here is the handler https://github.com/vietj/vertx-childprocess/blob/stdout-control-flow/src/main/java/io/vertx/ext/childprocess/impl/ChildProcessImpl.java
here is a test https://github.com/vietj/vertx-childprocess/blob/stdout-control-flow/src/test/java/io/vertx/ext/childprocess/SpawnTest.java#L214
if you run this test (on osx at least), after resume the buffer size is 0.
@pfxuan that's something possible, however it would be best to come first with a solution that works with the current api :-) (unless I'm using not well the current API).
@vietj Agree! I'm looking forward to seeing the first prototype. This integration is very useful.
@vietj Ok, looking at the code, this is going to be a little tricky. ByteBuffer is not a thread-safe class, so having multiple threads (NuProcess's and yours) accessing the buffer (at the same time) is going to be trouble.
NuProcess is going to compact the buffer after calling your handler. The contract is that the handler should read as much as it can. If you want to create back-pressure, to block the spawned process from writing more data, you need to let that buffer fill-up. Which you seem to be doing. If you flip the buffer, the back-pressure goes away, so don't do that.
If your handler is called, and remaining != 0, it is not safe to interact with the ByteBuffer from another thread. Once your handler is called with remaining == 0, it should be safe to interact with the ByteBuffer. Note, once remaining == 0, you may not ever get called again (but you may be called multiple times with remaining == 0).
My concern now is, without a change in NuProcess, the NuProcess thread may spin -- consuming lots of CPU.
@vietj The more I think about it, the more I think NuProcess will need to support a suspend()/resume() of some sort. The epoll (Linux), kqueue (Mac), and IO Completion Port (Windows) triggers need to be deregistered so that they aren't constantly firing that data is available -- but nothing is reading it -- causing NuProcess to spin and consume CPU.
@brettwooldridge indeed when using the ByteBuffer outside of NuProcess event loop sounds like an abuse and pause/resume should be supported natively. Whenever you come up with such feature, I can contribute a reactive-streams implementation on top of it. WDYT ?
I'm reviewing the specifications now. I'll let you know shortly.
@vietj @pfxuan I will implement the Reactive Streams specification. Please allow a week or so for me to cleanly refactor some of the internals.
@bhamiltoncx As I get closer, I may ask for your input. In particular, NuProcess does not elegantly implement back-pressure -- though I have a reasonably well-formed idea how to do it. What is missing from NuProcess is something that more closely mirrors the epoll/kqueue model that it is wrapping -- ie. while there is wantWrite() to express intention to write to stdin, there is no corresponding wantRead() to express intention to read (or not read). stdout/stderr are basically a hose with no shutoff -- we expect the implementer of the NuProcessHandler to keep-up, buffer, or otherwise discard the data the process is shoving at it.
This is the part that will change. I will attempt to do so in the least intrusive way possible to the API.
@brettwooldridge I'm so excited for receiving your direct support!
Yeah, happy to help brainstorm this.
I think we'll want to generalize and extend the wantWrite()
infrastructure to allow manipulating (and querying?) the paused / running
state of each of the I/O streams.
On Fri, Nov 6, 2015 at 8:01 AM Pengfei Xuan [email protected] wrote:
@brettwooldridge https://github.com/brettwooldridge I'm so excited for receiving your direct support!
— Reply to this email directly or view it on GitHub https://github.com/brettwooldridge/NuProcess/issues/53#issuecomment-154448271 .
I have completed the initial implementation of Reactive Streams in a branch called streams. The principal classes are in the com.zaxxer.nuprocess.streams package.
@pfxuan @vietj Would you mind creating some junit tests for com.zaxxer.nuprocess.streams? Take a look at NuStreamProcessBuilder as a place to start.
@bhamiltoncx There is a "substantial" (not really) change in the core API. NuProcess now requires expressing wants not only for STDIN, but now also all STDOUT and STDERR. And similar to the previous NuProcessHandler.onStdinReady() callback, now both onStdout() and onStderr() have a boolean return value that expresses whether more data is desired. In order to maintain current behavior, the simplest implementation is to return !closed; from those methods. Because you probably use the codecs, you may not need to change anything on your side (Buck). But your review is helpful.
Oh, and the streams branch does not currently run on Windows (haven't even looked at it yet). I will need to add support in Windows to mimic the "oneshot" events that epoll and kqueue provide.
@brettwooldridge good job I will have a look soon. I see that the implementation itself does not need reactive-streams lib out of the box (i.e one can use it without the reactive-streams lib) : would you mind to declare the reactive-streams library as optional=true ?
No problem on the optional dependency.
@brettwooldridge So awesome! I'll look at it and see if I can do something.
@brettwooldridge I can not find the implementation on specification of 3.10 in your NuStreamSubscription#request(). It looks like 3.10 requires a synchronous call on subscriber's onNext().
@pfxuan Please see rfc2119.
3.10 states:
While the Subscription is not cancelled, Subscription.request(long n) MAY synchronously
call onNext on this (or other) subscriber(s).
From RFC 2119:
MAY This word, or the adjective "OPTIONAL", mean that an item is truly optional. ...
@brettwooldridge It was a cool response! I'm trying to do compatibility test on NuStreamPublisher, but I have a difficulty to define the number of elements when constructing Publisher. I wonder if we don't explicitly call Subscriber's onNext() from Subscription's request(), how can Publisher control the status of its messaging queue and also notify the status onComplete when the queue gets empty?