flux-core icon indicating copy to clipboard operation
flux-core copied to clipboard

libsubprocess: support stdin flow control

Open chu11 opened this issue 1 year ago • 3 comments

From #4572

Rather than adding write responses, some kind of credit based flow control scheme seems appropriate here. For example (just brainstorming):

  • change the sender so that it only sends up to credits bytes per channel before pausing (stopping for example the stdin fd watcher in > - flux-exec. This probably requires a callback to expose credits to the user? Like on_credit (int bytes)?
  • add a new exec response type for issuing credits for a given stream
  • each stream's receive buffer sends a credit type response initially for the full buffer size
  • then each time some data is taken out of the buffer, send some credits
  • enhancement: only send credit when a low water mark is reached

chu11 avatar Sep 19 '24 18:09 chu11

hmmmm,

hiccup 1. We can't guarantee how fast each subprocess launched by flux-exec reads data out of its buffer. Some may read from their stdin buffer faster than others. Simple solution is determine the minimum amount that any of the subprocesses can take and only send that much to each one. But now we're bottle necked by the slowest subprocess, and it could be a hang if one of them is stuck.

        p = zlist_first (subprocesses);                                                                                                     
        while (p) {                                                                                                                         
            if (flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT                                                                           
                || flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING) {                                                                  
                if (flux_subprocess_write (p, "stdin", ptr, lenp) < 0)                                                                      
                    log_err_exit ("flux_subprocess_write");                                                                                 
            }                                                                                                                               
            p = zlist_next (subprocesses);                                                                                                  
        }                      

sounds acceptable? As I ponder this, it seems that the pros simply outweigh the cons in this scenario.

chu11 avatar Sep 20 '24 20:09 chu11

That seems expected to me since the consumer is in charge of the rate of date transfer.

garlick avatar Sep 20 '24 20:09 garlick

See flux-framework/rfc#427 for a proposed protocol.

garlick avatar Sep 24 '24 23:09 garlick