subscribe() should have a symmetric twin unsubscribe()
… so that wrapper functions can use their same existing function reference for both, rather than having to instantiate an AbortController.
Edit: Some people may want to protect their subscription from "unauthorized" unsubscribe(). We can easily achieve this by exempting subscriptions that have been made with an explicit AbortController signal for unsubscribing, or where false was given instead of such a signal. (The latter may be moot if there's a reliable built-in "never" signal available at no additional memory cost.)
… so that wrapper functions can use their same existing function reference for both, rather than having to instantiate an AbortController.
Unlike .addEventListener, subscribing to an observable with the same function/observer multiple times produces multiple subscriptions. Originally Observable returned a Subscription object with an .unsubscribe method, however this was changed so that it behaves correctly in both sync and async cases.
Exactly. I'll also point to https://github.com/WICG/observable/issues/12 for more discussion on the integration with AbortController/AbortSignal.
I see. I'm not 100% sure I understood #12 but it sounds like I could get a Subscription object with an .unsubscribe() method, or an opaque value (like from setTimeout) that I can pass into the Observable's .unsubscribe(). Both would be ok for me, even if not as symmetric as I had hoped. If one of them has an obvious performance and memory benefit, I'd prefer that.
And both make it impossible to solve the synchronous fire hose probably discussed in #12. Please read that issue fully.
I did read it fully. It's just that the example source code there is kinda cryptic to me, partially because it uses generic variable names like "subscriber" when there seem to be at least three different kinds of subscribers involved, so any time I see it I have to try and deduce from context which one it's refering to. Doesn't help either that I have to pay careful attention to whether the variable name carries a final "s" to mark the plural. "Inner" and "outer" aren't very obvious either.
I hope someone will find time to make a better example program to demonstrate the firehose problem. (Edit: I'll ask in the other thread if we can open-source the example and maybe even make it a test case, then I can play around with it.)
As it currently stands, it seems like I could trigger my AbortController at any time if I instantiate one and keep a reference to it around, so I really struggle to see why it would be hard to delegate the reference keeping to the Observable and delegate the controller triggering to a convenience method.
I think if you played around with the synchronous firehose example a bit more to understand it, it would be clear why a convenience method would make it impossible to react to the synchronous firehose case.
For this, one pattern that appears to be becoming more popular would be having subscribe(...) return a disposable token that can be used to subscribe. So, something like...
const unsubscribe = observable.subscribe(...);
// The return value is a function that, when called, unsubscribes and cancels the subscription
unsubscribe();
If the return value also supported the Symbol.dispose (from the TC39 Explicit Resource Management proposal) then we could also just do...
using unsubscribe = observable.subscribe(...);
Returning the unsubscribe as a function makes it fairly trivial to incorporate canceling the subscription in response to other events, for example:
someSignal.onabort = observable.subscribe(...);
someSignal.onabort = observable.subscribe(...);
Indeed, that would be a really neat syntax. If we use subscription objects instead (in order to maybe have other future functionality on them), we could make it bound by default (or be independent of "this") so you could still do it as
someSignal.onabort = observable.subscribe(...).unsubscribe;
For this, one pattern that appears to be becoming more popular would be having
subscribe(...)return a disposable token that can be used to subscribe.
Hmm, interesting. Where is this becoming more popular? Is this pattern making its way on the web platform? I'm not sure I've seen it a whole lot.
Returning the
unsubscribeas a function makes it fairly trivial to incorporate canceling the subscription in response to other events, for example:someSignal.onabort = observable.subscribe(...);
This is a little strange though because you'd you'd have two ways to abort the subscription:
- The signal passed into
subscribe()(if you're concerned about the Observable synchronously emitting events) - The magical callback function returned from
subscribe()
I guess the upside is that if you don't want to / need to cater to the synchronous firehose scenario, you can use the (maybe?) more ergonomic unsubscribe callback returned from subscribe()? Maybe I'm not opposed to adding this capability, but it might be a little confusing.
I'm going to add the "possible future enhancement" label since I think adding the return callback from subscribe() could be an enhancement that need not block the initial API design.
The more I think about the "symmetry" thing on subscribe/unsubscribe and the synchronous firehose problem - I don't think I would want a "subscription" to be returned from "subscribe" anymore.
And having two ways of doing the same thing (stop the observer), we have to explain to new developers the difference and when/why they have to use which. And I would almost always explain, how to use "takeUntil", because that's the most declarative one.
With the AbortController we already have the other side of "subscribe", but it's called "abort". We cannot change "abort", but we can name "subscribe" whatever we want to make this the "opposite" of "abort" (I'm not a native english speaker so I don't know if there's a word for this).
Just as a thought experiment: What, if we don't really use the words "subscription" (at the moment there is no subscription returned) and "subscribe"/"subscriber" and use "observe"/"observer"? Will this match semantically better with "abort"? Or to reflect the "takeUntil" something like "observeUntil" where we are more or less forced to specifiy the "until"?
Ignoring the (not yet) returned subscription or not passing an optional AbortSignal is a really easy way to introduce leaks.
Return a disposal function in my opinion is much more ergonomic than integrating with abort controller.
In general, the static methods that create a disposable object (e.g. .when, .subscribe, etc.) should return [T, cleanupFn]. This has several advantages, but the most important is that you can pass the T to another function and not worry that it will be disposed without your knowledge. Whoever called .when owns that value, and can safely rely on that assumption.
In particular, this means that if you have something like an original item (e.g. a subscription) that is protected by a ref counted pointer (implemented in userland), everyone who receives the subscription can also receive a cleanupFn that decrements the ref count, and the ref counted pointer is the only one that can actually call the subscription's dispose.
Abort controllers, because they need to passed in at call-time, provide these same guarantees, but IMO are not very ergonomic.
Do we nowadays have a demonstration of the synchronous firehose in free software, so I can play around with it and try to understand that problem? Or could someone who understands it please make a free-as-in-freedom example program? Because if that argument turns out to be sound it might be an important one.
Do we nowadays have a demonstration of the synchronous firehose in free software, so I can play around with it and try to understand that problem?
Nothing has changed from the example in #12, though the example can be even smaller (as per below).
Because if that argument turns out to be sound it might be an important one.
The argument is simple, it's impossible to write the function take that correctly unsubscribes where a synchronous firehose is possible because unsubscribe isn't returned from .subscribe until after the subscribeCallback has already run:
/**
* @returns An observable that contains at most <count> values, i.e. the same as [iterator.take](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Iterator/take) but for observables
*/
function take<T>(observable: Observable<T>, count: number): Observable<T> {
return new Observable(subscriber => {
let seen = 0;
const unsubscribe = observable.subscribe({
next(value) {
seen += 1;
if (seen > count) {
// This is a ReferenceError in the synchronous firehose
unsubscribe();
subscriber.complete();
} else {
subscriber.next(value);
}
},
error: (reason) => subscriber.error(reason),
complete: () => subscriber.complete(),
});
return unsubscribe;
});
}
const syncFirehose = new Observable((subscriber) => {
let i = 0;
while (subscriber.active) {
subscriber.next(i);
i += 1;
}
return () => {};
});
// This throws a ReferenceError when the 10th element is reached, rather than completing
// normally like it should
take(syncFirehose, 10).subscribe({
next(value) {
console.log(value);
},
});
If you're not convinced, you can even try it out with this very minimal implementation of Observable that returns unsubscribe:
type Observer<T> = {
next?: (value: T) => void;
error?: (reason: any) => void;
complete?: () => void;
};
class Subscriber<T> {
#observer: Observer<T> | null;
constructor(observer: Observer<T>) {
this.#observer = observer;
}
get active(): boolean {
return this.#observer !== null;
}
#close(): void {
this.#observer = null;
}
next(value: T): void {
if (this.#observer) {
this.#observer.next?.(value);
}
}
error(reason: any): void {
const observer = this.#observer;
this.#close();
if (observer) {
observer.error?.(reason);
}
}
complete(): void {
const observer = this.#observer;
this.#close();
if (observer) {
observer.complete?.();
}
}
}
type Unsubscribe = () => void;
type SubscribeCallback<T> = (subscriber: Subscriber<T>) => Unsubscribe;
class Observable<T> {
readonly #subscribeCallback: SubscribeCallback<T>;
constructor(subscribeCallback: SubscribeCallback<T>) {
this.#subscribeCallback = subscribeCallback;
}
subscribe(observer: Observer<T>): Unsubscribe {
return this.#subscribeCallback(new Subscriber(observer));
}
}
Thanks for your attempt to help! Unfortunately, your new example here doesn't seem to be free software either, at least I can't find a license grant. If you intend to keep it here, please wrap it in <details>…</details> to clutter this thread less. If you are the author and want to release it as free software, one easy way to do it is to move that code into a gist, together with a license statement. I made an example using the MIT license. That way I'll be legally allowed to actually work with your code. (Because in European law the default is "all rights reserved". (simplified, IANAL.))
That gist is good to know about. In the meantime, you should just "break European law" and plug the code into DevTools and play with it.
In the meantime, you should just "break European law" and plug the code into DevTools and play with it.
There's no need. If an argument is presented in a way that artificially restricts the audience's ability to verify it with any tool the audience prefers, that in itself is enough information for the audience to judge the validity. What use would it have for me to maybe understand more about the example code, or maybe even patch it to solve the problem, if I'm then not allowed to share my insights with the world? Let's approach this design choice with a scientific attitude.
That way I'll be legally allowed to actually work with your code.
Yes I wrote it (take is basically just a direct implementation of the spec anyway but with the added unsubscribe), you can do as you wish with it.
or maybe even patch it to solve the problem
The problem is that unsubscribe needs to be made available to the observer before .subscribe finishes returning, but this is inherently incompatible with being able to call it inside .next (which again is called synchronously, before subscribe finishes returning).
As mentioned in #12, while RxJS does return a thing with unsubscribe from .subscribe it also has this complicated subscriber chaining thing. This approach makes writing operators in userland more difficult, but also makes implementing and specifying Observable more complicated as well.