highland icon indicating copy to clipboard operation
highland copied to clipboard

back-propagation of _.nil

Open jeromew opened this issue 10 years ago • 43 comments

Hello,

I have been trying the following scenario :

var resource = open()
var s = _(function(push, next) {
    ..use resource to push elements and _.nil when resource is empty ..
})
.consume(function(err, x, push, next) {
    if (err === null) {
      push(err);
      next();
    }
    else if (x === _.nil) {
      resource.close();
      push(null, x);
    }
    else {
      push(null,x);
    }
})

I open a resource, consume from it, and wait for _.nil to close the resource.

This works well if I consume all the tokens with

s.resume();

now if i do

s.take(10).resume();

The resource is never closed, because take seem to send _.nil downstream but not inform upstream that they will not be pulled from again.

You might say that I could close the resource by catching the _.nil after take(10) but this is not what I want to do because I do not want the downstream code to have a reference to the resource.

Would it make sense to have back-propagation of _.nil or a mechanism to inform upstream streams that they will not be pulled from again ?

In node.js streams, when you do

s1.pipe(s2)

when s2 sends a 'close' event, s1 automatically unpipes s2 for example and you have a way to detect that nothing will be pulled again.

https://github.com/joyent/node/blob/master/lib/_stream_readable.js#L568

jeromew avatar Nov 27 '14 14:11 jeromew

Do you mean:

    if (err !== null) {
      push(err);
      next();
    }
    else if (x === _.nil) {
      resource.close();
      push(null, x);
    }
    else {
      push(null,x);
      next();
    }

LewisJEllis avatar Nov 27 '14 17:11 LewisJEllis

Yes, sorry for the typo, here is an example showing the "bug"

var _ = require('./lib/index.js')

_([1,2,3,4,5])
.consume(function(err, x, push, next) {
    if (err !== null) {
      push(err);
      next();
    }
    else if (x === _.nil) {
      console.log('nil')
      push(null, x);
    }
    else {
      push(null,x);
      next();
    }
})
.take(2)
.resume()

I would have expected 'nil' to be logged but was surprised it is not. The same with the take(2) before the consume logs 'nil'

jeromew avatar Nov 27 '14 17:11 jeromew

I see, but when take passes on nil, there's no guarantee that the source stream will never be pulled from again since we can have multiple consumers.

var _ = require('./lib/index');

var s = _([1,2,3,4,5])
.consume(function(err, x, push, next) {
  if (err !== null) {
    push(err);
    next();
  }
  else if (x === _.nil) {
    console.log('nil')
    push(null, x);
  }
  else {
    push(null,x);
    next();
  }
});

s.take(2).toArray(function (xs) {
  console.log(xs)
  s.take(3).toArray(function (xs) {
    console.log(xs);
    s.resume();
  });
});

LewisJEllis avatar Nov 27 '14 18:11 LewisJEllis

Should we even allow streams to be pulled from again?

This behavior results in situations like

var s = _([1, 2, 3, 4, 5]);
s.fork().take(1).toArray(_.log);
// => prints [1]

s.fork().take(2).toArray(_.log);
// => prints nothing.

The shared backpressure causes the second fork to never get the second element, since the first fork could be consumed later on.

vqvu avatar Nov 29 '14 23:11 vqvu

I've been defining all of my forks before consuming them, e.g.:

var s = ([1, 2, 3, 4, 5]); var sFork1 = s.fork(); var sFork2 = s.fork(); sFork1.take(1).toArray(.log); // => prints [1]

sFork2.take(2).toArray(_.log); // => prints [1, 2].

But I'd be interested in hearing best practices for this case. It seems like my example isn't the most functional way of handling this.

On Sat, Nov 29, 2014 at 3:44 PM, vqvu [email protected] wrote:

Should we even allow streams to be pulled from again?

This behavior results in situations like

var s = ([1, 2, 3, 4, 5]); s.fork().take(1).toArray(.log);// => prints [1]

s.fork().take(2).toArray(_.log);// => prints nothing.

The shared backpressure causes the second fork to never get the second element, since the first fork could be consumed later on.

— Reply to this email directly or view it on GitHub https://github.com/caolan/highland/issues/172#issuecomment-64969706.

ariutta avatar Nov 30 '14 20:11 ariutta

@ariutta Can you give a better example of what you're trying to do, as what you seem to be asking doesn't have anything to do with functional vs. not functional?

If you're just asking about whether you should assign your forks to a variable, I think the answer is "no unless you need to reference your fork later on." Otherwise, just take advantage of the chaining API instead of creating superfluous variables that you never really need.

I don't think I've ever run into a situation where I needed a reference to my fork later on.

vqvu avatar Nov 30 '14 22:11 vqvu

@vqvu, your second paragraph answered my question. Thanks!

ariutta avatar Dec 02 '14 19:12 ariutta

@vqvu the way I currently use highland is :ns

  • first, build a graph
  • then, pump

Ideally, each "transform" would have its own setup/teardown + an awareness of the global graph it is inserted into. In my example above, I thought there there could be a sort of back-propagation of the fact that the transforms will never be pulled from again.

but that is very fuzzy, now that I understand @LewisJEllis 's point (and yours) that streams can be re-used (I was under the wrong understanding that they had to be discarded when downstream decided everything was over)

I'll let you close this issue as it seems that my question is not a real issue.

jeromew avatar Dec 05 '14 16:12 jeromew

I've been looking at using this library in a project and ran into this exact problem, there's not a safe time to cleanup a resource.

An alternative to forcing streams to be one time use only might be to take the approach bacon.js does.

That is, have the user provide a function that allocates the resource and gets values, and have the user provide a function to cleanup the resource. Make it explicit that their "subscribe" function may be called multiple times (i.e. if a stream is stopped and then re-used the "subscribe" function would be called again).

Disclaimer: I haven't used bacon.js quite yet (just read some docs / source), as I wanted to try highland first.

ibash avatar Jan 03 '15 09:01 ibash

I also ran into a case sort of like this recently and thought about this a bit, esp. the question "Should we even allow streams to be pulled from again?"

I haven't managed to make a strong use case for the current way of allowing it - the example I gave above was just to make a point, and I can pull off the same thing using forks anyway if we disallow multiple pulls; this piece from my example above:

s.take(2).toArray(function (xs) {
  console.log(xs)
  s.take(3).toArray(function (xs) {
    console.log(xs);
    s.resume();
  });
});

could become:

s.fork().take(2).toArray(_.log)
// now we know the first fork is all done
s.fork().drop(2).take(3).toArray(_.log)

(assuming .drop makes its way in, of course)

Does anyone have a good example of where multiple consumptions/thunks on the same stream is useful/necessary? /cc @vqvu, @caolan @jeromew

I like the possibility that a stream should effectively "be discarded when downstream decide(s) everything (is) over" where "downstream decides everything is over" = "a thunk happens". In other words - can we safely add the restriction that every stream (or fork) only ever has one thunk? Then, once that thunk happens, we can back-prop _.nil and thus allow for teardown of encapsulated resources.

Barring that possibility, the setup/teardown handler idea is a good next option, but it feels weird to me for some reason that I haven't quite put my finger on. See also: relevant discussion in #161 on how .done might support either invoking the teardown handler or triggering the back-propagation.

LewisJEllis avatar Jan 03 '15 11:01 LewisJEllis

I'm not a fan of allowing multiple thunks for a single stream for the exact reasons mentioned in this issue; I've never had to do it myself. So I would support changing that behavior to allow for back-prop of nil.

vqvu avatar Jan 03 '15 20:01 vqvu

Question about semantics. Will there be a way to differentiate between "values are no longer needed" and "this is the end of the stream, but continue to push values". In the first you'd want to cleanup resources, immediately but in the second you'd probably want to wait.Let me know if an example use case is needed.

--  Islam Sharabash [email protected] 217-377-9657

On Sat, Jan 3, 2015 at 12:23 PM, vqvu [email protected] wrote:

I'm not a fan of allowing multiple thunks for a single stream for the exact reasons mentioned in this issue; I've never had to do it myself. So I would support changing that behavior to allow for back-prop of nil.

Reply to this email directly or view it on GitHub: https://github.com/caolan/highland/issues/172#issuecomment-68608179

ibash avatar Jan 03 '15 20:01 ibash

I don't understand your latter case...how can you continue to push values after the end of the stream?

vqvu avatar Jan 03 '15 20:01 vqvu

In particular I want to re-implement unix-sort using highland. The unix-sort library spawns the sort command and passes all the stream items to it. When it gets _.nil indicating end of items from the stream, it would send a EOF to the sort command so that everything can be sorted. After that it would send the stream down in sorted order.

I guess in this case it could cleanup in the onDone. The caveat being if something happens and you no longer care about the results of the stream, there's not a clear way to indicate "cleanup and ignore the results".

In pseudo-code it would look like this:

stream.consume(function (error, value, push, next) {
    if (error) {
      // pass along errors
      push(error);
      return next();
    }

    if (!sort) {
      var sort = initSort();
      sort.onDone(function() {
        sort.sortedValues.forEach(push);
      });
    }

    // stream is finished, tell sort that it can start
    if (value === _.nil) {
      sort.send(EOF);
    } else {
      sort.send(value);
    }
});

ibash avatar Jan 03 '15 21:01 ibash

There is an open PR on sort : https://github.com/caolan/highland/pull/169

what benefit do you see of using the unix-sort memory-wise. In both case it seems to me that you need to have the whole thing in memory just in case the last item becomes the first after sorting.

But you are right on the "cancel" pattern. Right now s.sort().take(1) will pull all the items in memory up until a nil is emitted by s, and there is no way to cancel it. This might be problematic in some cases and will probably need some thought.

jeromew avatar Jan 03 '15 21:01 jeromew

@ibash Ok, i get what you're saying. There will need be a way to differentiate between getting a nil from upstream (so you can start your sort), and getting a nil from downstream (a "cancel").

Perhaps what we need is an onDone and onCancel for the two cases, respectively.

vqvu avatar Jan 04 '15 02:01 vqvu

@jeromew as discussed here unix sort makes temporary files so it doesn't have to hold everything in memory.

@vqvu exactly -- not sure the best syntax for it, whether it should be a different value pushed to the consumer like _.cancel or something else.

ibash avatar Jan 04 '15 05:01 ibash

@vqvu you mean for onDone and onCancel to be handlers passed to .consume? So the .consume example from the docs would become something like:

var filter = function (f, source) {
    return source.consume({
        onDone: function (push, next) {
            push(null, _.nil);
        },
        handle: function (err, x, push, next) {
            if (err) {
                // pass errors along the stream and consume next value
                push(err);
                next();
            }
            else {
                // pass on the value only if the value passes the predicate
                if (f(x)) {
                    push(null, x);
                }
                next();
            }
        }
    });
};

My first thought was to use an additional special value (like _.cancel), but that would require modifying all the existing .consume implementations to handle that case. That might actually be a good thing, since some would treat it the same as _.nil and some would treat it differently - but it seems pretty boilerplatey for consumes that just go push(null, nil) in either case.

Another option might be to make the idea of a cancel live inside the idea of a nil - replace x === nil with _.isNil(x), which would return true in either the cancel or the nil case, and then have another way to determine if the nil is also a cancel. That way, consumes wouldn't have to care about the cancel case unless they behave differently for it.

I think out of these three options, I like the handlers the best, since consumes could have the option of just not passing the handlers if they don't have special behavior (and they'd default to something like push(null, nil)). That simplifies the boilerplate required for most consumes, and it also feels more like the through API, which takes a write handler and an end handler.

LewisJEllis avatar Jan 08 '15 16:01 LewisJEllis

I'm not sure on the exact API yet, I like handlers better than special values too. Using a special value may be complicated in implementation.

I was thinking more along the lines of

stream.consume(...).onCancel(...);

// Unlike done, onDone just registers a handler. It doesn't cause a thunk.
stream.consume(...).onDone(...);

I like this for a few reasons:

  1. keep backcompat.
  2. We can register an onCancel handler without having to write a consume handler.

We could also go with your handlers-to-consume option and assume that an undefined onCancel means "just push _.nil" to keep backcompat.

vqvu avatar Jan 08 '15 17:01 vqvu

@vqvu I dig that syntax. Still on the fence about having a separate error handler, though you can do that with the current api using .errors, it's just a bit awkward.

ibash avatar Jan 19 '15 17:01 ibash

As another datapoint, I ended up implementing the unix-sort / unix-join. _.nil is not back propagated so there could be some issue with slowly leaking resources.

ibash avatar Jan 21 '15 18:01 ibash

At the very least, how about Stream.end() walking back up the stream.source property until it hits null. Then it could write nil to that stream, which should flow back down.

jgrund avatar Feb 20 '15 16:02 jgrund

Or we could have Stream.write do the same if it encounters a nil.

jgrund avatar Feb 20 '15 16:02 jgrund

Writing nil won't work if the streams have buffered data. Not to mention that it won't even be emitted until downstream requests another element (which it won't because it's ended). And then we'd have to handle forks in a special way.

It also won't cover the important "generator holding a resource" case.

Plus, some transforms are implemented with pull and not consume, so walking up stream.source may not even reach the true source.

vqvu avatar Feb 21 '15 06:02 vqvu

After working with highland a bit more I think I prefer the syntax recommended here: https://github.com/caolan/highland/issues/172#issuecomment-69215140

I'm also wondering if the cancel is actually needed -- yes, you could waste a bit of work, but I'm not sure the complication of having separate handlers out weighs the benefits. If there was an onDone and onCancel hnadler -- do you need to have both clean up resources, to play it safe? Or can you get away with just implementing onDone?

ibash avatar Feb 23 '15 02:02 ibash

The chaining syntax

  • lets you bind handlers without having done the consume (I was expecting to allow an array of handlers),
  • lets you bind handlers to streams constructed directly from the constructor without having to add more syntax there,
  • provides a convenient place to bind destructors for forks, which are also not consumes, and
  • looks better to me.

Why do you prefer the other syntax?


Not sure about having separate handlers. The "clean up resources" and "fork" use cases that we have would work with just a unified onDestroy handler, and I agree that having one handler is better if we don't need two.

How about having a unified onDestroy (name subject to change) and just passing in the reason (caused by downstream or upstream) to the handler as an argument. That would allow handlers to differentiate from the two cases if they want to, and not if they don't.

vqvu avatar Feb 23 '15 05:02 vqvu

Or just don't bother passing a reason at all until someone comes up with a good use case for doing so...

The minimalist in me probably prefers this more.

vqvu avatar Feb 23 '15 05:02 vqvu

In my comment above I linked to the chaining syntax -- I prefer it as well, sorry for the misunderstanding. +1 to doing as little as possible until there's a good reason for it.

ibash avatar Feb 23 '15 05:02 ibash

Oh, sorry! My browser must have been misbehaving.

vqvu avatar Feb 23 '15 06:02 vqvu

The case I'm always hitting is needing to rebind destroy to the last transform in a chain so I can destroy the resource as well. something like:

  stream = _(function (push, next) {  });

 stream._destructors.push(function () {
   console.log('resource cleanup happens here.');
 });

 s2 =  stream.map(function () {...});

 s2.destroy = stream.destroy.bind(stream);

@vqvu You are correct that we would have to resume to get the nil flushed down if the stream is paused and internal pulls would pose a problem.

Given the onDestroy syntax, how would that work in the context of walking back up through a fork?

Say we had a stream that could have forks that are added and destroyed dynamically.

How would one decide the difference between "this fork is done" and "this stream is done"?

jgrund avatar Feb 23 '15 16:02 jgrund