highland icon indicating copy to clipboard operation
highland copied to clipboard

keepAlive option

Open cphoover opened this issue 10 years ago • 6 comments

Is there away to keep a consuming stream alive even if one of it's feeders sends an end of stream marker?... In my use case I'm dealing with multiple child process stdout streams that will die if the process dies.

for e.g

var _ = require("highland");

var mainStr = _().map(function(dat){
   return "[prefix] - " + dat;
});
mainStr.pipe(process.stdout);

str1 = _();
str1.pipe(mainStr);
str2 = _();
srt2.pipe(mainStr);

str1.write("here is a message 1");
str2.write("here is a message 2");
str1.write(_.nil);
str2.write("this message will cause an error to be thrown");

// results in error thrown: 
// Error: Can not write to stream after nil

Is there a way to keep the stream alive, but still clean up after itself when feeders die? for e.g a keepAlive method would be useful.

var mainStr = _().keepAlive().map(function(dat){
   return "[prefix] - " + dat;
});

cphoover avatar Apr 09 '15 12:04 cphoover

var _ = require("highland");

var mainStr = _().map(function(dat){
   return "[prefix] - " + dat;
});
mainStr.pipe(process.stdout);

str1 = _();
str2 = _();

_([str1, str2]).merge()
    .map(function(dat){
       return "[prefix] - " + dat;
    }).pipe(process.stdout);

str1.write("here is a message 1");
str2.write("here is a message 2");
str1.write(_.nil);
str2.write("this message will cause an error to be thrown");

You can also use parallel or sequence depending on your needs.

vqvu avatar Apr 09 '15 16:04 vqvu

Hmmm the problem is your example assumes I have all of the streams ahead of time... I have to merge readable streams into the main stream asynchronously...

A better example would be

    var _ = require("highland");
    var exec = require("child_process").exec;
    var cmd = "somecommand";

    var mainStr = _().keepAlive().map(function(dat){
       return "[prefix] - " + dat;
    });

    mainStr.pipe(process.stdout);

     this.interval = setInterval(function(){
            //need to merge exec(cmd).stdout to mainStream.
            exec(cmd).stdout.pipe(mainStream);
    }, 10 * 1000)

cphoover avatar Apr 09 '15 17:04 cphoover

The problem with your proposed keepAlive is that there's no way to know when all sources are done. Sources are merged asynchronously, so how do you know how many sources there are?

Can you do it like this?

var mainStr = _(function (push, next) {
    setInterval(function () {
        push(null, _(exec(cmd).stdout));
        if (done()) { // Figure out somehow
            push(null, _.nil);
        }
    });
}).merge().map(...);

vqvu avatar Apr 09 '15 18:04 vqvu

vqvu... In my use case I want it to stay alive forever. I'm creating a daemon. My assumption is that when each stream ends it will be automatically cleaned up... (i.e references will be removed so it can be gc'd). Have not tested tho.

cphoover avatar Apr 09 '15 19:04 cphoover

Just don't push nil if you want it to stay alive forever. The gc should happen correctly, I think. If it doesn't, then that's a bug in merge.

vqvu avatar Apr 09 '15 22:04 vqvu

thanks for the tips.

cphoover avatar Apr 10 '15 01:04 cphoover