str0m icon indicating copy to clipboard operation
str0m copied to clipboard

`handle_timeout()`: unlimited feedback buffer growth

Open alnyan opened this issue 8 months ago • 21 comments

If left to run for a while, I started observing out-of-memory crashes in my SFU implementation. I've decided to run heap profiling on it and found out that the stack trace related to most memory consumtion (and which keeps growing over the program's lifespan) is:

  • -- Frames for the SFU code itself --
  • str0m::Rtc::handle_input
  • str0m::Rtc::do_handle_timeout
  • str0m::session::Session::handle_timeout
  • str0m::streams::Streams::handle_timeout
  • str0m::streams::send::StreamTx::create_sr_and_update
  • -- VecDeque impl + some malloc/profiling frames --

In the code, I see str0m push SenderReport + SourceDescription feeback items if a stream in question is in the mids_to_report list, which is built in the streams::Streams::handle_timeout() function.

For some context, I am polling the outputs in a loop, where I call rtc.poll_output() for each of the streams, and if a Timeout is returned, I sleep and call rtc.handle_input(Instant::now()) to advance the internal timer of the RTC instance, or break out of the loop with the data if Transmit or Event is returned.

I am not yet sure if this is an actual issue with str0m itself or my specific usage of it, but maybe capping the amount of SenderReport feedback items would be helpful?

alnyan avatar Apr 08 '25 13:04 alnyan

Here is a flamegraph of my attempts at profiling the memory leak, I clipped it to the frame which seems to be "leaking" memory: Image

alnyan avatar Apr 08 '25 13:04 alnyan

Hi @alnyan!

Thanks for reporting this. It's very strange.

algesten avatar Apr 08 '25 14:04 algesten

If we look here:

https://github.com/algesten/str0m/blob/main/src/session.rs#L642-L644

we prioritize sending feedback over regular media packets. And on top SenderReports as sorted first. (follow code down from here) https://github.com/algesten/str0m/blob/main/src/session.rs#L658-L693

So it seems for us to fail to empty the feedback, something seriously strange is happening. Since you say you're polling to Timeout, we should send all things there are to send.

There could be some bug in the Rtcp::pack code so we fail to stuff any feedback into a UDP packet. Maybe you can run this test and add some println in session.rs to confirm that the feedback_tx is indeed growing all the time?

algesten avatar Apr 08 '25 14:04 algesten

I cannot reproduce the issue by running the code locally, unfortunately. I first observed this problem when running a deployment with ~800 streams, of which most are paused and ~10 are active (streaming actual data) at a time. I'll try to reproduce it tomorrow and will let you know if I come up with any results.

alnyan avatar Apr 08 '25 19:04 alnyan

I've managed to reproduce the issue: the feedback items keep piling up if either:

  • Transmit of Output::Transmit fails, which I assume is unlikely, because I'm using UDP sockets as a transport
  • Reception of input UDP datagram fails (e.g. remote client loses connection or goes offline for some reason)

I've confirmed this by commenting out either rtc.handle_input() for the received UDP datagrams or the UDP transmit code path and printing log messages with feedback list size in Session::handle_timeout(). When either is commented out, I start observing this list growing by 2 entries every second.

This makes me believe that the mechanism for "clearing" the VecDeque of feedback items relies on the remote peer properly "acknowledging" those items: an item needs to be (1) sent by the SFU and received by a remote peer and (2) "acknowledged" in some sort of way back to the SFU. If either of these steps are gone, those items start piling up until the session is dead due to a timeout.

alnyan avatar Apr 09 '25 11:04 alnyan

@alnyan can you post some example code?

algesten avatar Apr 13 '25 15:04 algesten

Here is an example event handling loop where I observe the feedback buffer growth:

        let mut outputs = vec![];
        let mut events = vec![];

        let mut next = Instant::now() + Duration::from_millis(10);

        loop {
            match rtc.poll_output()? {
                str0m::Output::Timeout(timeout) => {
                    if timeout < next {
                        next = timeout;
                    }
                    break;
                }
                str0m::Output::Event(event) => events.push(event),
                str0m::Output::Transmit(transmit) => outputs.push(transmit),
            }
        }

        // Transmit the transmits
        for output in outputs {
            udp::transmit(&output).await.ok();
        }

        for event in events {
            // ... handle events ...
            log::info!("{event:?}");
        }

        futures::select! {
            event = event_fut.fuse() => match event {
                Some(SessionEvent::UdpReceive(remote, data)) => {
                    let now = Instant::now();
                    let destination = (ip, port).into();
                    let input = Input::Receive(
                        now,
                        Receive::new(Protocol::Udp, remote.into(), destination, &data[..])?
                    );
                    if rtc.accepts(&input) {
                        // NOTE: handle_input() is commented out and this leads to feedback items piling up
                        // rtc.handle_input(input)?;
                    }
                }
                _ => (),
            },
            _ = util::sleep_until(next).fuse() => (),
        }

        // Advance the RTC's internal time
        rtc.handle_input(Input::Timeout(Instant::now()))?;

alnyan avatar Apr 14 '25 07:04 alnyan

@alnyan the problem here is that you will often get multiple outputs/events for every input. The way this is structured, you'd be forced to get a timeout/handle_input to exhaust the upper loop.

If you look at the example code:

loop {
    let timeout = match rtc.poll_output().unwrap() {
        // Stop polling when we get the timeout.
        Output::Timeout(v) => v,

        Output::Transmit(v) => {
            socket.send_to(&v.contents, v.destination).unwrap();
            continue;
        }

        Output::Event(v) => {

            // Abort if we disconnect.
            if v == Event::IceConnectionStateChange(IceConnectionState::Disconnected) {
                return;
            }
            continue;
        }
    };
...

The continue statements for Transmit and Event are crucial.

algesten avatar Apr 14 '25 11:04 algesten

Oh wait, you have a different structure with a shorter loop and break.

algesten avatar Apr 14 '25 11:04 algesten

Yes. The loop is designed to consume all the transmits/events and then break once the timeout is reached. Also, it is weird that doing the same "comment out the handle_input() call" way of reproducing this issue does not work for the chat example in the str0m's repo

alnyan avatar Apr 14 '25 13:04 alnyan

I have an idea for a possible fix: we can use the timestamp in the SR to drop all entries older than N seconds, which would limit the buffer growth. I'm not sure if this is a correct solution, but if you're okay with this one, I can try and do a PR for that

alnyan avatar Apr 21 '25 07:04 alnyan

@alnyan we are using str0m in production in SFU and are not seeing this problem. I'm trying to understand what's different in your setup. I don't want a band-aid, but rather understand the issue.

algesten avatar Apr 21 '25 09:04 algesten

I cannot reproduce the issue by running the code locally, unfortunately. I first observed this problem when running a deployment with ~800 streams, of which most are paused and ~10 are active (streaming actual data) at a time. I'll try to reproduce it tomorrow and will let you know if I come up with any results.

I think this is interesting. It's a big deployment and a lot of paused streams.

algesten avatar Apr 21 '25 09:04 algesten

@alnyan maybe a dumb question.

Could it be the entire future is stuck? Like on event_fut or udp::transmit?

Could you add debug log to a fork of str0m that logs the length of feedback_tx so that you see whether it's a single instance growing or many individual?

algesten avatar Apr 21 '25 09:04 algesten

@alnyan another theory is TWCC.

https://github.com/algesten/str0m/blob/main/src/session.rs#L272

TWCC is inserted before all other feedback. If you don't do handle_input, you probably get no TWCC. If you are using SDP, it's possible to disable TWCC by removing this line:

        "a=extmap:3 http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01",

(might be another extmap number in your case)

algesten avatar Apr 21 '25 09:04 algesten

@alnyan maybe a dumb question.

Could it be the entire future is stuck? Like on event_fut or udp::transmit?

I don't think so, udp::transmit finishes basically immediately, event_fut is fired any time there is a UDP packet related to the session.

Could you add debug log to a fork of str0m that logs the length of feedback_tx so that you see whether it's a single instance growing or many individual?

Just tested, it's multiple stream instances, most likely those for which connectivity is lost (either due to networking, or most likely, due to them disconnecting)

alnyan avatar Apr 21 '25 10:04 alnyan

@alnyan another theory is TWCC.

https://github.com/algesten/str0m/blob/main/src/session.rs#L272

TWCC is inserted before all other feedback. If you don't do handle_input, you probably get no TWCC. If you are using SDP, it's possible to disable TWCC by removing this line:

        "a=extmap:3 http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01",

(might be another extmap number in your case)

I don't think this is TWCC, as it doesn't show up as a source of memory allocations in the flamegraph, at least not to the extent that create_sr_and_update does.

alnyan avatar Apr 21 '25 10:04 alnyan

I mean that TWCC is somehow crowding out the other feedback.

algesten avatar Apr 21 '25 10:04 algesten

Just tested disabling TWCC in both SFU's accepted offer and returned answer, the result didn't change — feedback buffer still keeps growing if handle_input is not called.

alnyan avatar Apr 30 '25 21:04 alnyan

For some detail, here're the elements of the growing buffer when I comment out handle_input(), as seen at some iteration:

2025-05-04T07:42:20.077253Z  INFO str0m::session: ReceiverReport(ReceiverReport { sender_ssrc: Ssrc(1778016003), reports: [] })
2025-05-04T07:42:20.077257Z  INFO str0m::session: ExtendedReport(ExtendedReport { ssrc: Ssrc(4252946764), blocks: [Rrtr(Rrtr { ntp_time: Instant { tv_sec: 1083, tv_nsec: 87181950 } })] })
2025-05-04T07:42:20.077267Z  INFO str0m::session: ReceiverReport(ReceiverReport { sender_ssrc: Ssrc(1778016003), reports: [] })
2025-05-04T07:42:20.077270Z  INFO str0m::session: ExtendedReport(ExtendedReport { ssrc: Ssrc(2619164994), blocks: [Rrtr(Rrtr { ntp_time: Instant { tv_sec: 1083, tv_nsec: 87181950 } })] })
2025-05-04T07:42:20.077273Z  INFO str0m::session: ReceiverReport(ReceiverReport { sender_ssrc: Ssrc(1778016003), reports: [] })
2025-05-04T07:42:20.077277Z  INFO str0m::session: ExtendedReport(ExtendedReport { ssrc: Ssrc(4252946764), blocks: [Rrtr(Rrtr { ntp_time: Instant { tv_sec: 1084, tv_nsec: 87214692 } })] })
2025-05-04T07:42:20.077280Z  INFO str0m::session: ReceiverReport(ReceiverReport { sender_ssrc: Ssrc(1778016003), reports: [] })
2025-05-04T07:42:20.077283Z  INFO str0m::session: ExtendedReport(ExtendedReport { ssrc: Ssrc(4252946764), blocks: [Rrtr(Rrtr { ntp_time: Instant { tv_sec: 1085, tv_nsec: 87243061 } })] })
2025-05-04T07:42:20.077286Z  INFO str0m::session: ReceiverReport(ReceiverReport { sender_ssrc: Ssrc(1778016003), reports: [] })
2025-05-04T07:42:20.077289Z  INFO str0m::session: ExtendedReport(ExtendedReport { ssrc: Ssrc(4252946764), blocks: [Rrtr(Rrtr { ntp_time: Instant { tv_sec: 1086, tv_nsec: 87271886 } })] })

Both RR and ExtendedReport keep being added every second

alnyan avatar May 04 '25 07:05 alnyan

Can you follow the code path in poll_output and add println to show why it's not exhausting the feedback_tx in your polling loop?

algesten avatar May 04 '25 08:05 algesten

I'm having this same problem myself. My read of the situation is that we always create sender reports in handle_timeout() when we have streams, but only consume them if ICE is established. So if ICE never gets established, these just keep building and building up.

This seems consistent with the above comment that UDP transmission issues seem to cause it, because ICE cannot be established without those packets making it to their destination.

(Edit: Although I haven't confirmed this theory with a test project yet)

OxleyS avatar May 15 '25 05:05 OxleyS

Perhaps a bit tangential, but this is not the first time I've run into problems with stuff building up when the connection never establishes properly. Before, I had an issue with the VP8 packetizer allocating without it ever being consumed. It turns out I was gating my packet-forwarding logic on IceConnectionStateChange events, when I should have gated it on the Connected event. The difference was whether DTLS was guaranteed to be established or not. If ICE established but DTLS never did, packets would be buffered indefinitely.

It might be useful to audit for other places where this sort of thing might be happening.

OxleyS avatar May 15 '25 05:05 OxleyS

Interesting! So this problem could all be down to sessions should not start generating any feedback until we got ICE.

algesten avatar May 15 '25 07:05 algesten

sessions should not start generating any feedback until we got ICE

If that would significantly complicate the logic, another option is to generate the feedback anyway and just drop it when polled with send_addr not ready.

OxleyS avatar May 15 '25 08:05 OxleyS

@alnyan would it be possible for you to test with the latest main branch?

The problem might be fixed.

algesten avatar May 18 '25 09:05 algesten

@algesten Hi, I'm happy to update that this solved the OOM issue on our deployments.

zRedShift avatar Sep 17 '25 07:09 zRedShift

@zRedShift Excellent! Thanks for checking!

algesten avatar Sep 17 '25 07:09 algesten