Remove wakers for cancelled tasks
When an async function does:
async f(&mut self) {
tokio::select! {
v = self.subscriber().next => { /* do something with v */ }
_ = std::future::ready() => {},
}
}
then the future returned by self.subscriber().next is cancelled, but the observed object stilled referenced the waker, preventing the future (consequently, the function's closure) from being dropped even though it won't be scheduled again.
This change is twofold:
-
ObservableStateis now handedWeakreferences, so it does not keep futures alive, and a strong reference is kept by whichever object is held by the future awaiting it (SubscriberorNext) -
ObservableStategarbage-collects weak references from time to time, so its own vector of wakers does not grow unbounded
Many thanks for having this log line:
https://github.com/jplatte/eyeball/blob/7ce1b782991cc7e6ad01279203a7ce4312bcbf07/eyeball/src/state.rs#L122
I spent my day trying to figure why my app was leaking 1MB/s and Waking up XXXX waiting subscribers in my logs made me figure it out :)
Hey, thanks for this PR!
Is this pattern used in other channel implementations? I'm surprised something like this is necessary.
The only other channel I'm familiar with is tokio::mpsc, and its equivalent is to return Err when .send()ing to a channel with no subscriber
Hm, that does not make sense to me, how is that equivalent. All wakers in eyeball are already consumed when "sending" (setting a new value on the observable), even without this change.
Without this change, they are consumed only when sending. So if we never send (or take a long time between sends) then wakers accumulate.
Yes, but it sounds like tokio's mpsc suffers from the same problem then?
hmm okay actually, tokio::sync::broadcast. It's built on top of a linked list of messages shared by sender and receiver:
https://github.com/tokio-rs/tokio/blob/49eb26f159c839fb34f4ce964759692443ca1302/tokio/src/sync/broadcast.rs#L306-L307
and a (sorted) linked list recording the position and waker of each receiver:
https://github.com/tokio-rs/tokio/blob/49eb26f159c839fb34f4ce964759692443ca1302/tokio/src/sync/broadcast.rs#L312-L313
https://github.com/tokio-rs/tokio/blob/49eb26f159c839fb34f4ce964759692443ca1302/tokio/src/sync/broadcast.rs#L330-L331
https://github.com/tokio-rs/tokio/blob/49eb26f159c839fb34f4ce964759692443ca1302/tokio/src/sync/broadcast.rs#L354-L360
and when a receiver is dropped, it removes itself from the list, including the waker:
https://github.com/tokio-rs/tokio/blob/49eb26f159c839fb34f4ce964759692443ca1302/tokio/src/sync/broadcast.rs#L1332-L1334
(it's not clear to me why the receiver being dropped is always at the tail of the list; i guess what it calls "tail" is actually a pointer to the link that was the tail at the time the receiver was created and the list is actually doubly-linked)
Alternatively, we could probably remove all the manual waker management from eyeball and use tokio::sync::watch instead. Unfortunately there is no equivalent in the stdlib, so it would mean making dep:tokio/sync non-optional
eyeball was originally implemented on top of tokio, but that was limiting in a number of ways. I guess we should probably do something like tokio's waker list, but I'm not sure if there are any crates out that that implement this, and it feels a bit wrong to put it right in eyeball.