flyd
flyd copied to clipboard
End streams never seem to end
An end stream never seems to end. This can lead to memory leaks if you need listeners on the end-stream. I have the following code:
appRouter.use('/test/', function(req, resp) {
let resp$ = flyd.stream();
flyd.on(v => { resp.write(v) }, resp$);
flyd.on(v => { resp.end() }, resp$.end);
doAction(resp$);
});
Now doAction can emit as much output it wants to resp$ which is written as a response. At the end it simply emits via resp$.end(true) to end the response. But it seams that the end-stream isn't closed after it so its handler stays alive which leaks the whole closure (including the resp object for example).
Or is there maybe a way to listen for a stream ending without using 'on' on the end-stream?
Yes, that is true. If end streams had an end stream then that stream should also have an end stream. And it would never end.
Since there is only something emitted once from an end stream it should probably detach listeners and subscription once that one thing is emitted.
@paldepind, I was actually thinking of how to nicely represent end streams (I've been tinkering with a stream representation inspired by flyd -- trying to figure out what the concerns should be), and my answer to the end stream problem is that you can have it point to itself. Which is almost what's happening now with it being dependent on the trueFn.
Really, then, Stream is a recursive type.
So,
var s = stream();
s.end === s.end.end;
Which makes perfect sense -- if s.end() === true then we expect s.end.end() === true and so forth, and none of the "is ended" logic fails.
What do you think @paldepind ? I think this deserves attention, and @c-dante 's solution makes a lot of sense
We could take a hint from mithril's stream implementation
https://github.com/MithrilJS/mithril.js/blob/next/stream/stream.js#L24-L39
where they use Object.defineProperty to create a getter for an end stream.
This means the end stream is not created until it's needed, and allows for
s.end to be just a normal stream created with the same mechanism as any other stream.
s.end.end !== s.end though
@nordfjord
s.end.end !== s.endthough
You mean it is not implemented that way in mithril or that you are contra it? @c-dante idea is implementable via getter too (end stream just need to be constructed with special version of getter, always returning itself).
This idea attracts me because it gives us guarantees that end stream will be emitted only once with true and then would ignore other values. It protect us from a minority of multiple ending emission situations and greatly simplifies mutual stream ending:
on(s1.end, s2.end)
on(s2.end, s1.end)
This snippet has sence when we need not only to end stream «forward» but also propagate ending backward. In flyd (without logic above) this is an infinite loop :) You need to introduce at least one guard:
on(() => s1.end() || s1.end(true), s2.end)
on(s2.end, s1.end)
I was referring to @c-dante 's comment about Stream being a recursive type, mithril doesn't treat it as such. But you're correct we can implement it as a recursive type that way! Didn't think of that.
Let's make a PoC, here are some test cases to start:
describe('ending', () => {
it('mutually ending streams do not cause infinite loops', ()=> {
const s1 = stream();
const s2 = stream();
on(s1.end, s2.end);
on(s2.end, s1.end);
// if an infinite loop occurs we never reach this point.
assert.equals(true, true);
});
it('detatches listeners when pushed to', ()=> {
const s = stream();
const result = [];
s.end.map(result.push.bind(result));
s.end(true);
s.end(true);
assert.deepEqual(result, [true]);
});
it('end streams end stream is equal to end stream', ()=> {
const s = stream();
assert.equal(s.end, s.end.end);
assert.equal(s.end(), s.end.end());
s.end(true);
assert.equal(s.end(), s.end.end());
})
});
I've noticed that type signature for end is different from regular streams (Flow). I believe this is because of end streams have no end. In my typed code sometimes I go away from flyd$Stream and use weaker function definition as a stream abstraction.
Implementing @c-dante's idea will lead to filling that gap in typings: end stream will be fully fledged stream, because of having of all attributes of regular stream.