eyeball icon indicating copy to clipboard operation
eyeball copied to clipboard

Remove wakers for cancelled tasks

Open progval opened this issue 2 years ago • 8 comments

When an async function does:

async f(&mut self) {
    tokio::select! {
        v = self.subscriber().next => { /* do something with v */ }
        _ = std::future::ready() => {},
    }
}

then the future returned by self.subscriber().next is cancelled, but the observed object stilled referenced the waker, preventing the future (consequently, the function's closure) from being dropped even though it won't be scheduled again.

This change is twofold:

  1. ObservableState is now handed Weak references, so it does not keep futures alive, and a strong reference is kept by whichever object is held by the future awaiting it (Subscriber or Next)
  2. ObservableState garbage-collects weak references from time to time, so its own vector of wakers does not grow unbounded

Many thanks for having this log line:

https://github.com/jplatte/eyeball/blob/7ce1b782991cc7e6ad01279203a7ce4312bcbf07/eyeball/src/state.rs#L122

I spent my day trying to figure why my app was leaking 1MB/s and Waking up XXXX waiting subscribers in my logs made me figure it out :)

progval avatar Nov 11 '23 17:11 progval

Hey, thanks for this PR!

Is this pattern used in other channel implementations? I'm surprised something like this is necessary.

jplatte avatar Nov 11 '23 18:11 jplatte

The only other channel I'm familiar with is tokio::mpsc, and its equivalent is to return Err when .send()ing to a channel with no subscriber

progval avatar Nov 11 '23 18:11 progval

Hm, that does not make sense to me, how is that equivalent. All wakers in eyeball are already consumed when "sending" (setting a new value on the observable), even without this change.

jplatte avatar Nov 11 '23 18:11 jplatte

Without this change, they are consumed only when sending. So if we never send (or take a long time between sends) then wakers accumulate.

progval avatar Nov 11 '23 18:11 progval

Yes, but it sounds like tokio's mpsc suffers from the same problem then?

jplatte avatar Nov 11 '23 19:11 jplatte

hmm okay actually, tokio::sync::broadcast. It's built on top of a linked list of messages shared by sender and receiver:

https://github.com/tokio-rs/tokio/blob/49eb26f159c839fb34f4ce964759692443ca1302/tokio/src/sync/broadcast.rs#L306-L307

and a (sorted) linked list recording the position and waker of each receiver:

https://github.com/tokio-rs/tokio/blob/49eb26f159c839fb34f4ce964759692443ca1302/tokio/src/sync/broadcast.rs#L312-L313

https://github.com/tokio-rs/tokio/blob/49eb26f159c839fb34f4ce964759692443ca1302/tokio/src/sync/broadcast.rs#L330-L331

https://github.com/tokio-rs/tokio/blob/49eb26f159c839fb34f4ce964759692443ca1302/tokio/src/sync/broadcast.rs#L354-L360

and when a receiver is dropped, it removes itself from the list, including the waker:

https://github.com/tokio-rs/tokio/blob/49eb26f159c839fb34f4ce964759692443ca1302/tokio/src/sync/broadcast.rs#L1332-L1334

(it's not clear to me why the receiver being dropped is always at the tail of the list; i guess what it calls "tail" is actually a pointer to the link that was the tail at the time the receiver was created and the list is actually doubly-linked)

progval avatar Nov 11 '23 19:11 progval

Alternatively, we could probably remove all the manual waker management from eyeball and use tokio::sync::watch instead. Unfortunately there is no equivalent in the stdlib, so it would mean making dep:tokio/sync non-optional

progval avatar Nov 11 '23 19:11 progval

eyeball was originally implemented on top of tokio, but that was limiting in a number of ways. I guess we should probably do something like tokio's waker list, but I'm not sure if there are any crates out that that implement this, and it feels a bit wrong to put it right in eyeball.

jplatte avatar Nov 11 '23 20:11 jplatte