NuProcess icon indicating copy to clipboard operation
NuProcess copied to clipboard

Stdout/stderr pause/resume

Open vietj opened this issue 10 years ago • 59 comments

There is no way to pause/resume the standard output/error when the NuProcessHandler cannot handle the buffers.

It would be a useful addition for the Vert.x 3 integration https://github.com/vietj/vertx-childprocess

Are there plans to implement such feature ?

vietj avatar Nov 02 '15 19:11 vietj

@vietj I haven't tried it, but this should work...

If you simply don't read any data when you are called back, the pipe will fill-up and the child process itself will block. You would need to keep track of the fact that you have pending reads, and read the ByteBuffer to create room in the pipe. That will unblock the child process.

Let me know if it works.

brettwooldridge avatar Nov 03 '15 01:11 brettwooldridge

ok, you mean I leave the byte buffer untouched. I will let you know.

vietj avatar Nov 03 '15 06:11 vietj

so that would work to pause the stream, however how do I unpause it and request data again ? i.e how do I get a new callback to drain the pipe ?

vietj avatar Nov 03 '15 06:11 vietj

You won't get called back again. Presumably you somehow know when you want to pause and resume, so your callback will need to retain a reference to the ByteBuffer and you will need some other thread or timer to drain the buffer.

It's not exactly clear to me why vertx would need to suspend a stream in the first place...

brettwooldridge avatar Nov 03 '15 08:11 brettwooldridge

I see, that sounds doable and I will try.

Vert.x would do that for propagating flow control to another stream : you can read that short part of the doc http://vertx.io/docs/vertx-core/java/#streams

vietj avatar Nov 03 '15 08:11 vietj

Hi,

I'm looking further at this and the javadoc says You do not own the ByteBuffer provided to you. You should not retain a reference to this buffer. which seems contradictory with retaining a reference to the ByteBuffer.

vietj avatar Nov 06 '15 08:11 vietj

@vietj Opps, I apologize, the JavaDoc is out of date. It used to be that buffers were reused across processes, but after refactors by @bhamiltoncx this is no longer true. Each process instance has its own stdout, stderr, and stdin ByteBuffer instances.

brettwooldridge avatar Nov 06 '15 08:11 brettwooldridge

@brettwooldridge Does NuProcess have any plan to implement Reactive Streams?

pfxuan avatar Nov 06 '15 09:11 pfxuan

@brettwooldridge how can we ensure that the buffer is not modified concurrently by NuProcess eventLoop and a possible drain by another thread ?

vietj avatar Nov 06 '15 09:11 vietj

@brettwooldridge also now when I access the buffer outside NuProcess event loop I get buffer.remaining() == 0 (the buffer is full) which means the buffer was flipped so when using outside should we flip again this buffer ?

vietj avatar Nov 06 '15 09:11 vietj

@pfxuan This is the first that I've heard of the Reactive Streams initiative, but NuProcess would certainly welcome any contribution. If someone were to make such an effort, I would recommend the implementation go into a com.zaxxer.nuprocess.streams or com.zaxxer.nuprocess.reactive package.

brettwooldridge avatar Nov 06 '15 09:11 brettwooldridge

@vietj Can you post your NuProcessHandler implementation, either here or linked as a gist?

brettwooldridge avatar Nov 06 '15 09:11 brettwooldridge

@brettwooldridge Sounds a good plan! If @vietj can do this, the integration is going to become very natural: http://vertx.io/docs/vertx-reactive-streams/java/

pfxuan avatar Nov 06 '15 09:11 pfxuan

@brettwooldridge here is the handler https://github.com/vietj/vertx-childprocess/blob/stdout-control-flow/src/main/java/io/vertx/ext/childprocess/impl/ChildProcessImpl.java

here is a test https://github.com/vietj/vertx-childprocess/blob/stdout-control-flow/src/test/java/io/vertx/ext/childprocess/SpawnTest.java#L214

if you run this test (on osx at least), after resume the buffer size is 0.

vietj avatar Nov 06 '15 09:11 vietj

@pfxuan that's something possible, however it would be best to come first with a solution that works with the current api :-) (unless I'm using not well the current API).

vietj avatar Nov 06 '15 09:11 vietj

@vietj Agree! I'm looking forward to seeing the first prototype. This integration is very useful.

pfxuan avatar Nov 06 '15 09:11 pfxuan

@vietj Ok, looking at the code, this is going to be a little tricky. ByteBuffer is not a thread-safe class, so having multiple threads (NuProcess's and yours) accessing the buffer (at the same time) is going to be trouble.

NuProcess is going to compact the buffer after calling your handler. The contract is that the handler should read as much as it can. If you want to create back-pressure, to block the spawned process from writing more data, you need to let that buffer fill-up. Which you seem to be doing. If you flip the buffer, the back-pressure goes away, so don't do that.

If your handler is called, and remaining != 0, it is not safe to interact with the ByteBuffer from another thread. Once your handler is called with remaining == 0, it should be safe to interact with the ByteBuffer. Note, once remaining == 0, you may not ever get called again (but you may be called multiple times with remaining == 0).

My concern now is, without a change in NuProcess, the NuProcess thread may spin -- consuming lots of CPU.

brettwooldridge avatar Nov 06 '15 10:11 brettwooldridge

@vietj The more I think about it, the more I think NuProcess will need to support a suspend()/resume() of some sort. The epoll (Linux), kqueue (Mac), and IO Completion Port (Windows) triggers need to be deregistered so that they aren't constantly firing that data is available -- but nothing is reading it -- causing NuProcess to spin and consume CPU.

brettwooldridge avatar Nov 06 '15 10:11 brettwooldridge

@brettwooldridge indeed when using the ByteBuffer outside of NuProcess event loop sounds like an abuse and pause/resume should be supported natively. Whenever you come up with such feature, I can contribute a reactive-streams implementation on top of it. WDYT ?

vietj avatar Nov 06 '15 10:11 vietj

I'm reviewing the specifications now. I'll let you know shortly.

brettwooldridge avatar Nov 06 '15 10:11 brettwooldridge

@vietj @pfxuan I will implement the Reactive Streams specification. Please allow a week or so for me to cleanly refactor some of the internals.

@bhamiltoncx As I get closer, I may ask for your input. In particular, NuProcess does not elegantly implement back-pressure -- though I have a reasonably well-formed idea how to do it. What is missing from NuProcess is something that more closely mirrors the epoll/kqueue model that it is wrapping -- ie. while there is wantWrite() to express intention to write to stdin, there is no corresponding wantRead() to express intention to read (or not read). stdout/stderr are basically a hose with no shutoff -- we expect the implementer of the NuProcessHandler to keep-up, buffer, or otherwise discard the data the process is shoving at it.

This is the part that will change. I will attempt to do so in the least intrusive way possible to the API.

brettwooldridge avatar Nov 06 '15 15:11 brettwooldridge

@brettwooldridge I'm so excited for receiving your direct support!

pfxuan avatar Nov 06 '15 16:11 pfxuan

Yeah, happy to help brainstorm this.

I think we'll want to generalize and extend the wantWrite() infrastructure to allow manipulating (and querying?) the paused / running state of each of the I/O streams.

On Fri, Nov 6, 2015 at 8:01 AM Pengfei Xuan [email protected] wrote:

@brettwooldridge https://github.com/brettwooldridge I'm so excited for receiving your direct support!

— Reply to this email directly or view it on GitHub https://github.com/brettwooldridge/NuProcess/issues/53#issuecomment-154448271 .

bhamiltoncx avatar Nov 06 '15 16:11 bhamiltoncx

I have completed the initial implementation of Reactive Streams in a branch called streams. The principal classes are in the com.zaxxer.nuprocess.streams package.

@pfxuan @vietj Would you mind creating some junit tests for com.zaxxer.nuprocess.streams? Take a look at NuStreamProcessBuilder as a place to start.

@bhamiltoncx There is a "substantial" (not really) change in the core API. NuProcess now requires expressing wants not only for STDIN, but now also all STDOUT and STDERR. And similar to the previous NuProcessHandler.onStdinReady() callback, now both onStdout() and onStderr() have a boolean return value that expresses whether more data is desired. In order to maintain current behavior, the simplest implementation is to return !closed; from those methods. Because you probably use the codecs, you may not need to change anything on your side (Buck). But your review is helpful.

Oh, and the streams branch does not currently run on Windows (haven't even looked at it yet). I will need to add support in Windows to mimic the "oneshot" events that epoll and kqueue provide.

brettwooldridge avatar Nov 10 '15 05:11 brettwooldridge

@brettwooldridge good job I will have a look soon. I see that the implementation itself does not need reactive-streams lib out of the box (i.e one can use it without the reactive-streams lib) : would you mind to declare the reactive-streams library as optional=true ?

vietj avatar Nov 10 '15 06:11 vietj

No problem on the optional dependency.

brettwooldridge avatar Nov 10 '15 07:11 brettwooldridge

@brettwooldridge So awesome! I'll look at it and see if I can do something.

pfxuan avatar Nov 10 '15 12:11 pfxuan

@brettwooldridge I can not find the implementation on specification of 3.10 in your NuStreamSubscription#request(). It looks like 3.10 requires a synchronous call on subscriber's onNext().

pfxuan avatar Nov 11 '15 06:11 pfxuan

@pfxuan Please see rfc2119.

3.10 states:

While the Subscription is not cancelled, Subscription.request(long n) MAY synchronously
call onNext on this (or other) subscriber(s).

From RFC 2119:

MAY   This word, or the adjective "OPTIONAL", mean that an item is truly optional. ...

brettwooldridge avatar Nov 11 '15 06:11 brettwooldridge

@brettwooldridge It was a cool response! I'm trying to do compatibility test on NuStreamPublisher, but I have a difficulty to define the number of elements when constructing Publisher. I wonder if we don't explicitly call Subscriber's onNext() from Subscription's request(), how can Publisher control the status of its messaging queue and also notify the status onComplete when the queue gets empty?

pfxuan avatar Nov 11 '15 06:11 pfxuan