Ref Counted Observable discussion
After TPAC last week, it was determined that the ideal type of observable might be a ref-counted observable. The ref-counted observable will function roughly as follows:
- The observable is "cold" until the first subscriber.
- When the first subscriber subscribes, the subscription logic is executed.
- Every additional subscriber will share that subscription (it's "hot").
- When all subscribers abort their subscriptions, the ref-count drops to 0, and teardown/unsubscription logic is executed, returning the observable to a "cold" state.
The idea behind this type is to address concerns from one issue (#170), where an RxJS user thought it was confusing that sometimes observables have side effects, and sometimes they didn't. This would guarantee that for N subscribers, there would be at most one side effect.
This type exists in RxJS-land as the result of share(). The only two issues I can think of, off the top of my head, that will come up with this design are:
- When ref-count drops to zero, it's often desirable to have the teardown wait for some small amount of time to make sure that no new subscribers join shortly after. An example of this would be an observable that was shared amongst multiple web components. It's not uncommon that during a single render, all components may be removed or unmounted, then replaced with all new component instances that were going to subscribe to the same thing. In this case, the ref count would drop to zero, everything would teardown, then moments later new components would mount and subscribe, and the subscription needs to start all over again. To that end, RxJS has made this configurable.
- A very common use case for observables is to have this "share" behavior, but to emit to new subscribers the most recent value that was seen. For example, imagine you have a sensor that collects data once every 10 seconds. Ideally, new subscribers don't have to wait for 9.5 seconds, if the last bit of data showed up 0.5 seconds ago. RxJS has also made this configurable, but we don't have configuration for this in this new design.
Thanks for filing this before I could get to it! I do think this type tends to be a nice middle ground that provides general parity with userland Observables in terms of scheduling & coldness, as well as a fix to the userland issues around unpredictability and extra side-effects incurred with multiple subscriptions to the same Observable.
I want to clarify something that was discussed offline regarding Observable teardown & re-subscription. We discussed what should happen with an Observable that has gone from >=1 subscriber, to 0 subscribers, and back to >=1 subscriber. One thing that was mentioned was that this final subscribe() could throw, and the producer should not be restarted. However, we came to the conclusion last week that this was not feasible because it introduces an untenable amount of unpredictability into the API[^1]. So just for clarity, the model of an Observable becoming hot & then cold, will allow the next subscription to make the Observable hot again, and restart the producer to serve more subscriptions.
To that end, RxJS has made this [unsubscription / teardown] configurable.
In what way is it configurable? I read this as: you can configure the Observable to sometimes make teardown happen immediately / synchronously, and other times delayed by, say, a microtask. That feels a little unpredictable though. How does Rx, or other userland libraries handle this in the ref-counted producer type? I feel like if users sometimes need synchronous teardown and sometimes not, this could be provided by the userland code itself. That said, I'm not entirely opposed to just baking in strict microtask-delayed unsubscription timing.
[^1]: For example, you could get an Observable from some source, save it, and before you subscribe to it, someone else could quickly subscribe and unsubscribe. Therefore, at any time asynchronously in the future, could not be sure that subscribe() wouldn't throw.
IMO this is the most reasonable semantic. It makes the most sense for event listeners:
- they're relatively cheap to subscribe/unsubscribe to
- however, unnecessary subscribers could have undesired consequences (e.g. touch/scroll listeners)
- By and large subscribing/unsubscribing to event listeners doesn't throw exceptions
Since event listeners are the initial primary use for observables from within the platform using when, making this the default for the "neutral" (constructed) observable would create a reasonably consistent set of expectations.
*(I am less familiar with current uses of RxJS etc though)
I'm almost completely sold on the ref-counted observable after some deep thought and experimentation. But it does limit the type quite a bit in a few ways.
A very, very common thing through my experience, and what research I've done, is that people want the ability to "replay" the most recent value to new subscribers. This one:
For example, imagine you have a sensor that collects data once every 10 seconds. Ideally, new subscribers don't have to wait for 9.5 seconds, if the last bit of data showed up 0.5 seconds ago.
A use case we have at work is we have a shared worker that is getting a stream of messages from a service, and when new web views start up, they have to subscribe to the stream of messages, but they don't want to wait for the next message, because it might be several minutes before one arrives. So they've created a (non-RxJS) "observable" that simply caches the previous value and sends it to new subscribers. That's not possible with the proposed ref-counted observable.
A more web-platform based use case would be observable-based APIs around things similar to IntersectionObserver. Right now, IntersectionObserver will immediately fire with information when you start observing an element. However, if more than one component needs to know if that element is intersecting or not, the API isn't great. You'd have to cache the last value yourself, or observe the element again (I'm not sure that does anything immediately though?) Where an observable-based API would be more straight forward:
// Hypothetically... an API like this:
const elementOnScreen = IntersectionObserver.observe(element, { root: null }).map((entry) => entry.isIntersecting);
elementOnScreen.subscribe(handler, { signal });
// and later, somewhere else...
elementOnScreen.subscribe(handler, { signal });
// Which to get anything even close to parity we'd have to do something like this:
const callbacks = new Set();
let isIntersecting = false;
const observer = new IntersectionObserver((entries) => {
for (const entry of entries) {
if (entry.target === element && entry.isIntersecting !== isIntersecting) {
isIntersecting = entry.isIntersecting;
for (const callback of callbacks) {
// Average developers screw this up.
// If the callback throws, we don't want to
// break the loop and cause producer interference.
// Observable handles this.
try {
callback(isIntersecting);
} catch (error) {
reportError(error);
}
}
break;
}
}
}, { root: null })
observer.observe(element);
function isElementOnScreen(callback, { signal }) {
callbacks.add(callback);
callback(isIntersecting);
signal.addEventListener('abort', () => {
callbacks.delete(callback);
if (callbacks.size === 0) {
// ref count zero
observer.unobserve(element);
}
}, { once: true });
}
The problem is we need a way to configure new Observable to be able to replay the last value as an opt-in.
In what way is it configurable? I read this as: you can configure the Observable to sometimes make teardown happen immediately / synchronously, and other times delayed by, say, a microtask. That feels a little unpredictable though.
For RxJS it's just an option like share({ resetOnRefCountZero: 3000 }) (wait 3000ms) or even share({ resetOnRefCountZero: async () => { /* whatever */ }) }).
That feels a little unpredictable though.
If it's ref-counted, it's already unpredictable. Any given consumer can't "know" they're the last one to end their subscription.
For this API, I think it should be configurable in the new Observable constructor in a second argument of options. new Observable(fn, { beforeReset: async () => {} } would do, IMO. Scheduler.postTask or anything could be used there. It would only happen on ref count zero... not on producer complete or error, those should immediately reset things.
The problem is we need a way to configure
new Observableto be able to replay the last value as an opt-in.
In performance APIs, you play all the buffered values and not only the last one. And then this requires some way to manage/empty the buffer etc... Also buffering only the last event would have implications as you're secretly retaining that value from garbage collection.
I would expect to do this kind of things as some sort of a composition rather than a default behavior, like
element.when("click").buffer(1).subscribe(...)
@noamr .buffer(1) can't be created/composed with a ref-counted observable, unfortunately. We'd need the constructor to allow an opt-in like: new Observable(fn, { buffer: 1 }) or the like... even if we wanted to build a .buffer(1) method.
@noamr
.buffer(1)can't be created/composed with a ref-counted observable, unfortunately. We'd need the constructor to allow an opt-in like:new Observable(fn, { buffer: 1 })or the like... even if we wanted to build a.buffer(1)method.
I don't understand why it "can't" but I don't have the bandwidth to dig deeper unfortunately so will take your word for it.
Seems to me that observables try to create a unified API to things that are in fact subtly different (event targets, promises, different kinds of observers like interaction/resize/performance/mutation). Perhaps it would be good to have some examples of how observable integration would look like in those cases, with emphasis on things like buffering and subscribe/unsubscribe semantics. (*Perhaps these examples exist already, I'm not familiar enough with all past conversations)
I don't understand why it "can't"
The result of observable.buffer(1) would need to replay the last buffered value to every new subscriber. But an always ref-counted observable can't treat every new subscriber differently. Only a "cold" observable can really do that.
I'm not sure I follow. observable.buffer(1) could just always store the last value it received from the single producer. It pushes it to all existing subscribers, and then holds onto it in case any new subscribers come along before the producer pushes the next value. Once it receives the next value, it would drop the old one, hold onto it, push it to all existing subscribers, and hold onto it in case any more new subscribers come along before the next new value from the producer.
Does that not work?
@domfarolino How could this be implemented on top of a ref-counted observable?
Here it is with a cold observable:
ColdObservable.prototype.buffer = function (bufferSize) {
let buffer = [];
const subscribers = new Set();
let abortController = null;
return new ColdObservable((subscriber) => {
subscribers.add(subscriber);
subscriber.addTeardown(() => {
subscribers.delete(subscriber);
if (subscribers.size === 0) {
// last unsubscription, disconnect from source
abortController.abort();
abortController = null;
}
});
// Notify the new subscriber with whatever is in the buffer.
for (const value of buffer) {
subscriber.next(value);
}
if (subscribers.size === 1) {
// First subscription, connect to the source.
abortController = new AbortController();
this.subscribe({
next: (value) => {
buffer.push(value);
if (buffer.length > bufferSize) buffer.shift();
for (const subscriber of subscribers) {
subscriber.next(value);
}
},
error: (error) => {
buffer = []
for (const subscriber of subscribers) {
subscriber.error(error);
}
subscribers.clear();
},
complete: () => {
buffer = [];
for (const subscriber of subscribers) {
subscriber.complete();
}
subscribers.clear();
}
}, { signal: abortController.signal })
}
});
}
The problem with an always-ref-counted observable is that the subscriber you get is only the subscriber to the source. It's not the joining subscriber. Calling subscriber.next() on it will notify all listeners.
The design I had in mind would be that you'd have a subscriber object for each consumer, and when the Observable encountered a new value from the producer, it would push it to all N existing subscribers. It would hold onto a single value, and push it to any new subscribers that joined the list of pre-existing subscribers.
Edit
To clarify, the producer would only see a single subscriber, but the implementation would keep track of a number of observers, and whenever any new observers joined, the Observable would be responsible for pushing the single value it holds to the new observer. (I misused the word subscriber in the previous message)
Edit
To clarify, the producer would only see a single subscriber, but the implementation would keep track of a number of observers, and whenever any new observers joined, the Observable would be responsible for pushing the single value it holds to the new observer. (I misused the word
subscriberin the previous message)
I guess that would be a bit strange because it would only buffer if you have subscribers, and the first subscriber wouldn't get any buffered events.
In terms of web APIs, every API has its own buffering semantics in a way. I think this would have to be expressed somehow in the shim between the APIs and the observer pipeline (the when function equivalent).
LOL... I'm so sorry, I'm not following what the idea is.
Generally speaking, a ref-counted observable is implemented pretty much the same way a cold-observable is... with the difference being that it has an internal subscriber that forwards everything to a list of external subscribers.
So... if your observable looks like this:
const secondClock = new Observable((internalSubscriber) => {
const id = setInterval(() => internalSubscriber.next(Date.now()), 1000);
internalSubscriber.addTeardown(() => clearInterval(id));
})
and you subscribe like this:
secondClock.subscribe(console.log);
secondClock.subscribe(console.log);
You're going to add two Subscribers, one for each subscribe call, to an internal list of some sort... and when the first one is added, an internalSubscriber (also a Subscriber) is created and passed to the initialization/producer function from the constructor... when that internalSubscriber is notified, it will forward the notification to all subscribers in the internal list.
Further, when consumer subscribers unsubscribe (abort) they are removed from the list, if the list gets to length/size 0, then the internalSubscriber will teardown, and the observable will go "cold" again.
Okay, so I put together a ref-counted Observable example in a StackBlitz
It should show the behavior. It will hopefully also demonstrate how there's not really a way to send a previously buffered value to a new consumer with the default interface.
I also like the "ref count" approach to the Observable API, because it's always difficult to explain to new developers when, how and why they must use "share" etc. on an observable. Even after years of using rxjs with Angular I sometimes make this error by myself. Most of the time I get not hit by this error, because most of the time there's only one subscriber - and then the problem doesn't occur. But sometimes you add another subscriber in the future and then things get suddenly weird. That's why I have some state manament thing to wrap sideeffects like calling an http resource etc., because I have to do it nevertheless to handle errors.
And I understand the problem with not being able to "buffer".
So here are some thoughts from me: The buffer is some kind of state to me. And observables are not really good at managing state (or they only could because of a replay/buffer mechanism). Currently when talking about state, signals come to my mind (but they don't have to be used to manage state - and they don't really exist in the language or platform yet, just userland). In my experience, when I need some replay mechanism, it's not because I want to process the event behind this, but to display a derived state/value from some source. And then a signal seems a better choice (to me).
Just some random thoughts on this - I may be completely of the track... 😎
Thanks a lot @flensrocker! This is great feedback to be getting. From a purist perspective, I like the idea of separating state management (like a buffer of past events) out from this proposal, so I'm definitely wondering if we can get away with not having a buffer option in the constructor or similar operator. It'd be good to hear from others how important this might be for the Refcounted producer proposal to land.
Really the only issue I have at all with the proposal as it stands, the ONLY issue, is that there's no way to create an observable that "replays" previous values to new subscribers. MAYBE people could subclass it like this?
class BufferedObservable extends Observable {
constructor(subscriberCall, options) {
super(subscriberCall);
this.#bufferSize = Math.max(0, options?.bufferSize ?? Infinity)
this.#innerSource = this.do((value) => {
this.#buffer.push(value);
if (this.#buffer.length > this.#bufferSize) this.#buffer.shift();
});
}
subscribe(subscriber, options) {
const next = typeof subscriber === 'function' ? subscriber : subscriber?.next ? subscriber.next.bind(subscribeer) : null;
if (typeof next === 'function') {
for (const value of this.#buffer) {
next(value);
}
}
return this.#innerSource.subscribe(subscriber, options);
}
// do this for EVERY method?
map(...args) {
return this.#innerSource.map(...args);
}
}
Overall it sort of sucks, for what is a fairly common use case.
Or we could just add a buffer option to the Observable constructor?
Or we could just add a buffer option to the Observable constructor?
... or a subclass, kind of like how streams have subclasses for different buffering options
I think we have three options here:
- Subclassing + userland buffering, with something like: https://gist.github.com/domfarolino/3043eaa78c35356e052f5385cbb42806.
- Pros: You can do this in userland without platform support.
- Cons: It's really messy because you need a clever way to override every operator to return a
BufferedObservableinstead of a native one. That's necessary because ifbufferedObservableis aBufferedObservablewith N values in its buffer, and you callbufferedObservable.map(v=>v*2).subscribe(v=>console.log(v)), you needmap()here to return an Observable whose subscription algorithm knows how to subscribe tobufferedObservable, so that it empties its buffer and synchronously maps all of the buffered values - Further,
.when()returns a native platform Observable. So in order to buffer events, you'd have to build a wrapper around the Observable.when()returns that converts it into a userland buffered Observable
- Subclassing but with custom bindings to match how JS built-ins are constructed.
- The code gist above is messy because your subclass has to override every single operator to return a
BufferedObservablethat knows how to properly subscribe upstream. This could be easier if we made Observables more like JS built-ins (thanks @domenic for the idea), so that each operator doesn't just return anew Observable, but it returns a newthis.constructor. This allows for each operator called on a subclass to naturally return an instance of that subclass.
class DomArray extends Array { constructor(...args) { super(...args); } }; const array = new DomArray(1, 2, 3); const newArray = array.map(x => x * 100); // This is a `DomArray`, not a normal `Array`- The con here is that nothing else on the web platform behaves like this besides JS built-ins, apparently. It would be a fairly big exception, and it's unclear if it's worth it. It would support userland subclassing better for Observables, though.
- The code gist above is messy because your subclass has to override every single operator to return a
- Full platform support with a
{buffer: n}constructor parameter. I guess you'd need a similar parameter in https://wicg.github.io/observable/#dictdef-observableeventlisteneroptions, so that you could tell.when()how to construct the Observable it returns with the right buffer size.- Note that I was originally pushing for a
.buffer(5)operator, but I think @benlesh proved to me at some point that it's insufficient—a constructor parameter would be required. I can't remember why this is the case, maybe @benlesh could show an example about how an operator wouldn't cut it?
- Note that I was originally pushing for a
I'm honestly personally not convinced that buffering is so important that it must be included in native Observables immediately. Furthermore, the old model—before moving to shared/refcounted producers—was also vulnerable to missing older values that weren't buffered, if I understand correctly. Imagine a non-shared, cold Observable from .when(). Subscribing immediately registers an event listener, but there is no way to grab events that were fired before your subscription, and we were fine with that.
I guess I lean towards treating buffering stuff as a follow-up if absolutely necessary; waiting for that demand to come in and responding to it accordingly.
A .buffer(5) type method won't work because methods create new observables and return them, you need to be able to do something that identifies new subscribers as they join so you can send the buffered values to them. There's not really an exposed way to get at the new subscriber, since that's all created internal to the Observable during the subscribe call.
Couldn't the new Observable returned from .buffer(5) just have its internal subscribe callback forward all buffered values to new subscribers as they join?
After some discussion and thought about when ref-counted observables should unsubscribe, in particular when references counted hits zero, it should happen synchronously for now and probably be configurable later. I say this because:
- Synchronous actions are a bit more deterministic
- It's an uncommon case that one would want to delay the teardown of a resource
- It's probably something folks can compose by delaying the call to
.abort(), or even scheduling whatever teardown was going to occur.
I do want to point out that under the suggested change Observable.from(asyncOrSyncIterable) is rather strange now. In particular if you subscribe while iteration is happening you may see only a partial stream. (Similarly for promise it is possible to actually miss the value).
It may be better to make observables consistently hot here and simply share a single iterator (i.e. every subscription will only see the remaining iterator).
Also more generally, for some uses with hot observables you might also share state outside of the subscription so that you don't need to recreate resources (such as file handles). As such it might be a lot more neccessary to make observable disposable so that producers can more eagerly know when no future subscriptions are possible.
It may be better to make observables consistently hot here and simply share a single iterator (i.e. every subscription will only see the remaining iterator).
What does "consistently hot" mean? Are you recommending that after iterable is exhausted and the subscription is closed, all subsequent subscribe() calls will not restart the iteration, and just be dead, analogous to a Promise that has already resolved?
I'm honestly personally not convinced that buffering is so important that it must be included in native Observables immediately
The whole buffering (or even better, the JS platform style subclassing thing) is required to build things like BehaviorSubject, which is a very, very popular type more than 200K references to it in public GitHub TypeScript files alone
What is BehaviorSubject you ask?
A BehaviorSubject can have many names. It's very similar to a "Atom" in some libraries, or a "Signal" (although value delivery is different than a "Signal")
- Create a
BehaviorSubjectwith aninitialValue, that value sets an internallastValuestate - All new observers that
subscribeto the subject will synchronously have thelastValuepushed to them. - Any calls to
BehaviorSubject#next(value)will setlastValueto whatevervalueis, then notify all current observers of the new value.
The use case for this is common and simple: What an observer joins a stream late, the developer wants that observer to get the most recent value, rather than having to wait for a new one to arrive.
const lastKnownMousePosition = new BehaviorSubject({ x: -1, y: -1 })
// Connect the observable to the BehaviorSubject
document.when('mousemove').subscribe(lastKnownMousePosition)
// at any time later we can get the last known mouse position stream
// without needing the user to move the mouse first
setTimeout(() => {
lastKnownMousePosition
.take(3)
.subscribe(console.log)
}, 3000)
There are also many explicit use-cases of BehaviorSubject where folks are using them to create sort of ad-hoc observables that track values for reactivity purposes, generally for UIs. This is often because you'll start some stream of data, and there's no guarantee in most frameworks when side-effects to subscribe to that stream of data will occur, so you don't want to miss the most recent value. (For sake of argument, this is one of the reasons a Promise instance will replay the resolved value to late callers to then())
- Subclassing + userland buffering, with something like: https://gist.github.com/domfarolino/3043eaa78c35356e052f5385cbb42806.
This one won't work at all, unfortunately. The goal is to have the most recent values played to new subscribers and only to the new subscribers. There's just not a way to get this example working in this manner, and, as you said, it's pretty messy.
- Subclassing but with custom bindings to match how JS built-ins are constructed.
(This is currently my preference, having started to work with this)
From my experimentation... if we used new this.constructor() inside every operator, then the sky is the limit for what I can do, and all my asks will basically stop. lol. I could build a "cold observable" for backwards compat with RxJS 1-7, I could build a BehaviorSubject, which is a very, very popular type. Etc, etc.
- Full platform support with a
{buffer: n}constructor parameter. I guess you'd need a similar parameter in https://wicg.github.io/observable/#dictdef-observableeventlisteneroptions, so that you could tell.when()how to construct the Observable it returns with the right buffer size.
This will also work, but it's decidedly more cumbersome and less flexible than number 2 above. From my experimentation, it has two problems:
- We'd need to thread through the
{ buffer: N }config to every observable constructed by operators. Which gets gnarly fast. I guess we'd need to expose those options on the instance somehow like:
Observable.prototype.map = function(mapper) {
return new Observable((subscriber) => {
// impl here...
}, { ...this.config })
}
- It's not obvious to folks trying to build/subclass their own stuff that this is necessary.
- Note that I was originally pushing for a
.buffer(5)operator, but I think @benlesh proved to me at some point that it's insufficient—a constructor parameter would be required. I can't remember why this is the case, maybe @benlesh could show an example about how an operator wouldn't cut it?
The reason a buffer operator will not work for what I'm proposing, is basically what this type of buffering does is it replays previously seen values to newly-joined observers. The current spec gives no programmatic access to newly-joined observers, and there's probably not an ergonomic way to provide such a thing.
I guess I lean towards treating buffering stuff as a follow-up if absolutely necessary; waiting for that demand to come in and responding to it accordingly.
If we implement the operators in terms of new this.constructor rather than new Observable, there will be no need for any follow-ups, IMO.
Short-short version:
Implementing all operators on Observable in terms of this.constructor or the like would enable folks like myself to build pretty much anything we need to build off of observable, and the spec can completely forget the buffer case entirely.
In JS pseudo-code that would be:
Observable.prototype.map = function (mapper) {
return new this.constructor((subscriber) => {
// impl here...
})
}
// instead of:
Observable.prototype.map = function (mapper) {
return new Observable((subscriber) => {
// impl here...
})
}
// And for statics...
Observable.from = function (value) {
return new this((subscriber) => {
// impl here...
})
}
// instead of:
Observable.from = function (value) {
return new Observable((subscriber) => {
// impl here...
})
}
Following the limits expressed in https://github.com/WICG/observable/issues/200, what about a property independantSubscriptions: boolean provided as Observable's options ?
new Observable(/* ... */, { independantSubscriptions: true /* default: false */});
This property would be stored internally, and all parts related to the Observable's weak subscriber, would just be skipped. Thus subscribing twice to an Observable would result in two distinct calls of the subscribe callback.
If [=this=]'s [=Observable/weak subscriber=] is not null and [=this=]'s [=Observable/weak subscriber=]'s [=Subscriber/active=] is true:
Could become:
If [=this=]'s [=Observable/independent subscriptions=] is not true, and if [=this=]'s [=Observable/weak subscriber=] is not null and [=this=]'s [=Observable/weak subscriber=]'s [=Subscriber/active=] is true:
And spec.bs#L780
- Set [=this=]'s [=Observable/weak subscriber=] to |subscriber|.
=>
If [=this=]'s [=Observable/independent subscriptions=] is not true, set [=this=]'s [=Observable/weak subscriber=] to |subscriber|
I think the idea of having a shared Observable by default is the right choice, however, the current spec forbids concurrent and independent subscriptions on the same Observable, which prevents the creation of replay, buffer, etc. operators.
Such a requirement should probably not be ignored, or we could end up with strange, unoptimized, and/or hazardous individual attempts.