ts-stream icon indicating copy to clipboard operation
ts-stream copied to clipboard

Promise-based (abort-)chaining may keep unnecessary links

Open poelstra opened this issue 8 years ago • 10 comments

Callbacks passed to forEach() are automatically 'unset' when the stream is ending, which breaks the link from one stream to the next after it's done. However, links between streams still remain due to e.g. the mutual aborted() promises being linked. This is not ideal in terms of e.g. garbage collection.

Not sure of a good way to fix this yet, though.

poelstra avatar Jul 21 '15 19:07 poelstra

Since an aborted+ended stream is useless afterward, I suppose it's up to the source and sink to end the promise chains and drop the references to the stream. I don't think that could cause an actual garbage collection issue?

rogierschouten avatar Jul 23 '15 06:07 rogierschouten

That is true, and the trouble isn't really when the stream is aborted (because the promise is then rejected, thus 'breaking' the link).

The trouble is when a stream is NOT aborted, in which case aborted()'s promise is never resolved, and there's no way to 'detach' a promise.

As long as everyone drops references to the stream objects, they will be collected, so it shouldn't be a big issue, but I feel it's something that's how it should be.

poelstra avatar Jul 23 '15 07:07 poelstra

Maybe make a distinction between fulfilling and rejecting an abort promise, and always fulfilling it on end?

rogierschouten avatar Jul 24 '15 07:07 rogierschouten

Hmmm, that's a very interesting idea indeed, and in terms of implementation is trivial.

However, I'm not entirely happy with the name of that method then... Seems weird that aborted() resolves when the stream isn't actually aborted. I'll have to think about this somewhat more. Maybe if it's 'integrated' into some other 'has stream ended' method it would be less surprising.

On the other hand, I'm also not entirely happy with the way each Transform has to 'manually' link streams together for aborting. If I can move that to transform() somehow, this is much less of a problem, because linking an unlinking may then be more 'involved'. Moving it into transform() is not possible now though, because of the transforms only requiring a Readable and Writable, not ReadableStream and WritableStream.

poelstra avatar Jul 24 '15 08:07 poelstra

how about using a signal/event for aborted instead of a promise? After all, it isn't really a promise anyway.

rogierschouten avatar Jul 24 '15 18:07 rogierschouten

I'm interested in implementing this. I'd like to change the name to halt() and have it treat no argument as a "clean" abort, a string argument as a "clean" abort with a reason, and an Error as the current behaviour. But this would be a big breaking change at this point.

MattiasMartens avatar Jul 22 '20 08:07 MattiasMartens

We can certainly discuss that, but this issue is about something different: it is not about the abort() function, but the aborted() function.

For context, a little history: aborted() returns a promise, that is rejected with the abort error if the stream is aborted. Original reason for making it a rejected promise was to be able to have long stack traces. ts-stream used to depend on ts-promise (before Promises were a built-in concept in JS), and ts-promise provides some very nice functionality where you can get 'async' stack traces. As long as you properly 'chain' the promises together, all of that works out-of-the-box.

Nowadays, we're using native Promises here, so this no longer holds, and, as indicated in the OP, this nicety comes at the cost that if the stream is never rejected, we basically keep references 'up and down' the stream due to all these always-pending abort-promises.

My current (well, years old, actually) thinking is to change the contract of aborted() to not always return a promise anymore. Something like:

// for backwards compatibility
aborted(): Promise<never>;
// new way: onAbort will be called when stream is aborted, returned function can be
// called to unregister the registered callback if necessary (but see note below)
aborted(onAbort: (err?: Error) => void): () => void;

When the stream is completed (and/or aborted), we can then simply 'forget' all these registered onAbort callbacks, solving the perpetual 'links' issue.

Also, we are then in active control of when the callbacks will be called, which actually solves a race-condition that we currently have, where it is possible that e.g. aborted() did already reject, but the catch() handler attached to it is only called after we already did a write(). writeEach() does not have this problem, but only because it does have access to the internals of the abort mechanism of a stream. If we have the callback mechanism, everyone can have the same ordering guarantees.

I'm not really sure that returning a callback to unregister is really necessary at this stage, because there is no mechanism to unregister any forEach() callbacks either. So it probably wouldn't be useful in practice. So practically, returning void may actually be good enough.

Note that when to call those abort callbacks is not entirely trivial, and also what needs to happen when they throw is rather vague. This was another reason to implement it as a promise chain, because that leaves the handling of any such errors to the end-user.

poelstra avatar Jul 22 '20 14:07 poelstra

My current (well, years old, actually) thinking is to change the contract of aborted() to not always return a promise anymore. Something like:

I like this quite a bit! It's actually pretty similar to what I had in mind for a speculative version of halt() that would hook into the behaviour of abort() for interoperability with existing code.

I'm quite happy to run with this.

Note that when to call those abort callbacks is not entirely trivial, and also what needs to happen when they throw is rather vague.

This is true. I'm not sure what the complexities are of when to run the callbacks, but I guess that would come up in the process of implementing it.

As for what needs to happen when they throw, I think the best-case would be to chain it to the result of the Stream in question (possibly wrapped, to provide context for where it came from). Then the fallback (if result has already resolved, or if there's no practical way to implement that) is to treat it as "must-not-throw", so leave the rejection unhandled (again, possibly wrapping for context).

MattiasMartens avatar Jul 22 '20 18:07 MattiasMartens

I guess that would come up in the process of implementing it.

Or during writing the tests, more likely ;)

On halt(), cancel(), etc, I would propose we open a separate issue to discuss that, because it's an orthogonal issue.

think the best-case would be to chain it to the result of the Stream in question (possibly wrapped, to provide context for where it came from).

I disagree:

  1. we're already sending too much stuff (IMO) down the same channel. For example, if end() was called with an error, and a downstream ender also fails, then there's only a single error we can return.
  2. it would mix errors in that are (can be) unrelated to the stream itself. Right now, result() can only contain errors that somehow originate from one of the elements in the pipeline. But in this case, if some external code attaches an abort handler to the stream and that crashes, it would influence the result of the stream too.

Then the fallback (if result has already resolved, or if there's no practical way to implement that) is to treat it as "must-not-throw", so leave the rejection unhandled (again, possibly wrapping for context).

Whether the callback is to be treated as "must-not-throw" should not be conditional, especially not on runtime state.

But my gut feeling was indeed also that it's probably best to always just treat it as "must-not-throw", and if it does, throw it as unhandled error. Note that it can in this case really be an unhandled error if we want it to, because the method must be synchronous.

The algorithm to do such throwing could be something like:

for (const handler of handlers) {
    try {
        handler(abortError);
    } catch (e) {
        // Throw error as an unhandled error
        setImmediate(() => {
            throw e;
        });
    }
}

Wrapping is possible, too. But to make that really helpful (i.e. if someone prints error.stack), you'll need to ensure that the stack of the wrapped error then includes the stack of the original error etc, which gets messy (especially given that ts-stream should work in node and all browsers). See BaseError to get an idea of the mess you'll get into. So then the thrown error will basically become something similar to that WriteAfterAbortError kind of thing we discussed in #50, but when that is thrown, you'll only see that some abort handler threw, and you still don't really know which one, whereas in the other case you'd probably see the callback itself being mentioned in the stack trace.

poelstra avatar Jul 22 '20 19:07 poelstra

Agree on all of the above.

For wrapping errors, that discussion can continue over in #50.

Otherwise, I think the requirements here are clear enough to start working on a draft pull request. Will post that when it's ready for other's eyes.

MattiasMartens avatar Jul 22 '20 20:07 MattiasMartens