observable icon indicating copy to clipboard operation
observable copied to clipboard

subscribe() should have a symmetric twin unsubscribe()

Open mk-pmb opened this issue 1 year ago • 18 comments

… so that wrapper functions can use their same existing function reference for both, rather than having to instantiate an AbortController.

Edit: Some people may want to protect their subscription from "unauthorized" unsubscribe(). We can easily achieve this by exempting subscriptions that have been made with an explicit AbortController signal for unsubscribing, or where false was given instead of such a signal. (The latter may be moot if there's a reliable built-in "never" signal available at no additional memory cost.)

mk-pmb avatar Jun 06 '24 18:06 mk-pmb

… so that wrapper functions can use their same existing function reference for both, rather than having to instantiate an AbortController.

Unlike .addEventListener, subscribing to an observable with the same function/observer multiple times produces multiple subscriptions. Originally Observable returned a Subscription object with an .unsubscribe method, however this was changed so that it behaves correctly in both sync and async cases.

Jamesernator avatar Jun 07 '24 00:06 Jamesernator

Exactly. I'll also point to https://github.com/WICG/observable/issues/12 for more discussion on the integration with AbortController/AbortSignal.

domfarolino avatar Jun 07 '24 14:06 domfarolino

I see. I'm not 100% sure I understood #12 but it sounds like I could get a Subscription object with an .unsubscribe() method, or an opaque value (like from setTimeout) that I can pass into the Observable's .unsubscribe(). Both would be ok for me, even if not as symmetric as I had hoped. If one of them has an obvious performance and memory benefit, I'd prefer that.

mk-pmb avatar Jun 07 '24 21:06 mk-pmb

And both make it impossible to solve the synchronous fire hose probably discussed in #12. Please read that issue fully.

domfarolino avatar Jun 08 '24 01:06 domfarolino

I did read it fully. It's just that the example source code there is kinda cryptic to me, partially because it uses generic variable names like "subscriber" when there seem to be at least three different kinds of subscribers involved, so any time I see it I have to try and deduce from context which one it's refering to. Doesn't help either that I have to pay careful attention to whether the variable name carries a final "s" to mark the plural. "Inner" and "outer" aren't very obvious either.

I hope someone will find time to make a better example program to demonstrate the firehose problem. (Edit: I'll ask in the other thread if we can open-source the example and maybe even make it a test case, then I can play around with it.)

As it currently stands, it seems like I could trigger my AbortController at any time if I instantiate one and keep a reference to it around, so I really struggle to see why it would be hard to delegate the reference keeping to the Observable and delegate the controller triggering to a convenience method.

mk-pmb avatar Jun 08 '24 02:06 mk-pmb

I think if you played around with the synchronous firehose example a bit more to understand it, it would be clear why a convenience method would make it impossible to react to the synchronous firehose case.

domfarolino avatar Jun 09 '24 14:06 domfarolino

For this, one pattern that appears to be becoming more popular would be having subscribe(...) return a disposable token that can be used to subscribe. So, something like...

const unsubscribe = observable.subscribe(...);

// The return value is a function that, when called, unsubscribes and cancels the subscription
unsubscribe();

If the return value also supported the Symbol.dispose (from the TC39 Explicit Resource Management proposal) then we could also just do...

using unsubscribe = observable.subscribe(...);

Returning the unsubscribe as a function makes it fairly trivial to incorporate canceling the subscription in response to other events, for example:

someSignal.onabort = observable.subscribe(...);

jasnell avatar Jun 10 '24 23:06 jasnell

someSignal.onabort = observable.subscribe(...);

Indeed, that would be a really neat syntax. If we use subscription objects instead (in order to maybe have other future functionality on them), we could make it bound by default (or be independent of "this") so you could still do it as

someSignal.onabort = observable.subscribe(...).unsubscribe;

mk-pmb avatar Jun 11 '24 01:06 mk-pmb

For this, one pattern that appears to be becoming more popular would be having subscribe(...) return a disposable token that can be used to subscribe.

Hmm, interesting. Where is this becoming more popular? Is this pattern making its way on the web platform? I'm not sure I've seen it a whole lot.

Returning the unsubscribe as a function makes it fairly trivial to incorporate canceling the subscription in response to other events, for example:

someSignal.onabort = observable.subscribe(...);

This is a little strange though because you'd you'd have two ways to abort the subscription:

  1. The signal passed into subscribe() (if you're concerned about the Observable synchronously emitting events)
  2. The magical callback function returned from subscribe()

I guess the upside is that if you don't want to / need to cater to the synchronous firehose scenario, you can use the (maybe?) more ergonomic unsubscribe callback returned from subscribe()? Maybe I'm not opposed to adding this capability, but it might be a little confusing.

domfarolino avatar Jun 11 '24 02:06 domfarolino

I'm going to add the "possible future enhancement" label since I think adding the return callback from subscribe() could be an enhancement that need not block the initial API design.

domfarolino avatar Jun 11 '24 02:06 domfarolino

The more I think about the "symmetry" thing on subscribe/unsubscribe and the synchronous firehose problem - I don't think I would want a "subscription" to be returned from "subscribe" anymore.

And having two ways of doing the same thing (stop the observer), we have to explain to new developers the difference and when/why they have to use which. And I would almost always explain, how to use "takeUntil", because that's the most declarative one.

With the AbortController we already have the other side of "subscribe", but it's called "abort". We cannot change "abort", but we can name "subscribe" whatever we want to make this the "opposite" of "abort" (I'm not a native english speaker so I don't know if there's a word for this).

Just as a thought experiment: What, if we don't really use the words "subscription" (at the moment there is no subscription returned) and "subscribe"/"subscriber" and use "observe"/"observer"? Will this match semantically better with "abort"? Or to reflect the "takeUntil" something like "observeUntil" where we are more or less forced to specifiy the "until"?

Ignoring the (not yet) returned subscription or not passing an optional AbortSignal is a really easy way to introduce leaks.

flensrocker avatar Nov 16 '24 11:11 flensrocker

Return a disposal function in my opinion is much more ergonomic than integrating with abort controller.

In general, the static methods that create a disposable object (e.g. .when, .subscribe, etc.) should return [T, cleanupFn]. This has several advantages, but the most important is that you can pass the T to another function and not worry that it will be disposed without your knowledge. Whoever called .when owns that value, and can safely rely on that assumption.

In particular, this means that if you have something like an original item (e.g. a subscription) that is protected by a ref counted pointer (implemented in userland), everyone who receives the subscription can also receive a cleanupFn that decrements the ref count, and the ref counted pointer is the only one that can actually call the subscription's dispose.

Abort controllers, because they need to passed in at call-time, provide these same guarantees, but IMO are not very ergonomic.

rbalicki2 avatar Feb 28 '25 02:02 rbalicki2

Do we nowadays have a demonstration of the synchronous firehose in free software, so I can play around with it and try to understand that problem? Or could someone who understands it please make a free-as-in-freedom example program? Because if that argument turns out to be sound it might be an important one.

mk-pmb avatar Mar 01 '25 08:03 mk-pmb

Do we nowadays have a demonstration of the synchronous firehose in free software, so I can play around with it and try to understand that problem?

Nothing has changed from the example in #12, though the example can be even smaller (as per below).

Because if that argument turns out to be sound it might be an important one.

The argument is simple, it's impossible to write the function take that correctly unsubscribes where a synchronous firehose is possible because unsubscribe isn't returned from .subscribe until after the subscribeCallback has already run:

/**
  * @returns An observable that contains at most <count> values, i.e. the same as [iterator.take](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Iterator/take) but for observables
  */ 
function take<T>(observable: Observable<T>, count: number): Observable<T> {
    return new Observable(subscriber => {
        let seen = 0;
        const unsubscribe = observable.subscribe({
            next(value) {
                seen += 1;
                if (seen > count) {
                    // This is a ReferenceError in the synchronous firehose
                    unsubscribe();
                    subscriber.complete();
                } else {
                    subscriber.next(value);
                }
            },
            error: (reason) => subscriber.error(reason),
            complete: () => subscriber.complete(),
        });
        return unsubscribe;
    });
}

const syncFirehose = new Observable((subscriber) => {
    let i = 0;
    while (subscriber.active) {
        subscriber.next(i);
        i += 1;
    }
    return () => {};
});

// This throws a ReferenceError when the 10th element is reached, rather than completing
// normally like it should
take(syncFirehose, 10).subscribe({
    next(value) {
        console.log(value);
    },
});

If you're not convinced, you can even try it out with this very minimal implementation of Observable that returns unsubscribe:

type Observer<T> = {
    next?: (value: T) => void;
    error?: (reason: any) => void;
    complete?: () => void;
};

class Subscriber<T> {
    #observer: Observer<T> | null;

    constructor(observer: Observer<T>) {
        this.#observer = observer;
    }

    get active(): boolean {
        return this.#observer !== null;
    }

    #close(): void {
        this.#observer = null;
    }

    next(value: T): void {
        if (this.#observer) {
            this.#observer.next?.(value);
        }
    }

    error(reason: any): void {
        const observer = this.#observer;
        this.#close();
        if (observer) {
            observer.error?.(reason);
        }
    }

    complete(): void {
        const observer = this.#observer;
        this.#close();
        if (observer) {
            observer.complete?.();
        }
    }
}

type Unsubscribe = () => void;

type SubscribeCallback<T> = (subscriber: Subscriber<T>) => Unsubscribe;

class Observable<T> {
    readonly #subscribeCallback: SubscribeCallback<T>;

    constructor(subscribeCallback: SubscribeCallback<T>) {
        this.#subscribeCallback = subscribeCallback;
    }

    subscribe(observer: Observer<T>): Unsubscribe {
        return this.#subscribeCallback(new Subscriber(observer));
    }
}

Jamesernator avatar Mar 01 '25 11:03 Jamesernator

Thanks for your attempt to help! Unfortunately, your new example here doesn't seem to be free software either, at least I can't find a license grant. If you intend to keep it here, please wrap it in <details>…</details> to clutter this thread less. If you are the author and want to release it as free software, one easy way to do it is to move that code into a gist, together with a license statement. I made an example using the MIT license. That way I'll be legally allowed to actually work with your code. (Because in European law the default is "all rights reserved". (simplified, IANAL.))

mk-pmb avatar Mar 02 '25 00:03 mk-pmb

That gist is good to know about. In the meantime, you should just "break European law" and plug the code into DevTools and play with it.

domfarolino avatar Mar 02 '25 15:03 domfarolino

In the meantime, you should just "break European law" and plug the code into DevTools and play with it.

There's no need. If an argument is presented in a way that artificially restricts the audience's ability to verify it with any tool the audience prefers, that in itself is enough information for the audience to judge the validity. What use would it have for me to maybe understand more about the example code, or maybe even patch it to solve the problem, if I'm then not allowed to share my insights with the world? Let's approach this design choice with a scientific attitude.

mk-pmb avatar Mar 02 '25 19:03 mk-pmb

That way I'll be legally allowed to actually work with your code.

Yes I wrote it (take is basically just a direct implementation of the spec anyway but with the added unsubscribe), you can do as you wish with it.

or maybe even patch it to solve the problem

The problem is that unsubscribe needs to be made available to the observer before .subscribe finishes returning, but this is inherently incompatible with being able to call it inside .next (which again is called synchronously, before subscribe finishes returning).

As mentioned in #12, while RxJS does return a thing with unsubscribe from .subscribe it also has this complicated subscriber chaining thing. This approach makes writing operators in userland more difficult, but also makes implementing and specifying Observable more complicated as well.

Jamesernator avatar Mar 03 '25 01:03 Jamesernator