esp-idf-svc icon indicating copy to clipboard operation
esp-idf-svc copied to clipboard

strange blocking when subscribing to async sysloop

Open teamplayer3 opened this issue 1 year ago • 8 comments

IDF-VERSION: v4.4 esp-idf-svc: master

I have the following calling sequence:

let mut wifi_driver = ...;

smol::block_on(async move {
    let mut sub_1 = sysloop.as_async().subscribe::<WifiEvent>().unwrap(); // for other logic
    wifi_driver.set_configuration(&Configuration::AccessPoint(
        AccessPointConfiguration::default(),
    )).unwrap();
    wifi_driver.start().unwrap();

    wifi_driver.set_configuration(&Configuration::Mixed(
        ClientConfiguration::default(),
        AccessPointConfiguration::default(),
    )).unwrap();
    
    let mut sub_2 = sysloop.as_async().subscribe::<WifiEvent>().unwrap();

    wifi_driver.start_scan(&ScanConfig::default(), false).unwrap();
    while !matches!(sub_2.recv().await, WifiEvent::ScanDone) {}
    let res = wifi_driver.get_scan_result().unwrap();
})

Now the call subscribe() for sub_2 blocks forever. I tracked the blocking behavior down to esp_event_handler_instance_register() in EspEventLoop::subscribe_raw().

Fixes:

  1. Now, if I remove sub_1 before reconfiguring the wifi driver (never create sub_1 or dropping it before config wifi as mixed) will fix it.
  2. When I remove the wifi set_configuration(Mixed...) function call, the second subscription works, but of course, the start_scan() fails because wrong configured. (not really a fix)

The main problems are the sub_1 subscription and the calling sequence of sub_2 and set_configuration(Mixed...). But I don't understand why. The second fix is also a little bit strange.

Does this problem refere to

However, registering the same handler to the same event multiple times would cause the previous registrations to be overwritten.

from esp-idf.

teamplayer3 avatar Dec 20 '22 12:12 teamplayer3

The problem with your code boils down to the fact, that the async-fied Rust version of the ESP IDF event loops implements a naive Broadcast channel; as in multple-producers(=the folks who post to the queue)-multiple-consumers(=the folks who subscribe to the queue)-each-consumer-sees-each-produced-message.

The problem with naively-implemented Broadcast queues is that unless ALL consumers are actively consuming messages, at some point the queue will overflow and block, as there will be outstanding messages in the queue that are not "seen"/"consumed" by a certain consumer. In your case, the offending consumer is sub_1, as you are never, ever pumping messages from it.

There is no magical solution; broadcast queues just suffer from this problem. The "non-naive" versions just start skipping messages for some consumers if they lag too far behind, but this is too complex to address in the simple ESP IDF async wrapper we have here.

Does this problem refere to

However, registering the same handler to the same event multiple times would cause the previous registrations to be overwritten. from esp-idf.

No. We don't use this call, as it is kind of deprecated anyway.

ivmarkov avatar Dec 20 '22 18:12 ivmarkov

If you have digested the above... there is maybe one more problem which exacerbates the situation. start, set_configuration and start_scan are not async methods. Now, if they or other methods they are calling don't post to the queue, that's not a problem, however there is no way for me to know, as the Wifi code is not open source. (Perhaps we can check the closest open source code which is the Eth driver code.)

If these were async, they would async-block if the queue is full, but that would've allowed you to still pump messages from the subscribers from a different async task or even from a different branch of a select call. Unfortunately, they cannot be made async as they are written in C. To make them async, we need to fix their post method to NOT blocking-block on full queue, but to async-block on full queue. However and as I said - this is beyond our capabilities as we are dealing with C code here.

ivmarkov avatar Dec 20 '22 18:12 ivmarkov

Now that I think of it, we can implement an opt-in for skipping subscribers (subscribers that start skipping events if not pumped up). This would turn the event loop into some sort of a Signal for those subscribers that are skipping (you need to check the Signal async synchronisation primitive, possibly from embassy-sync).

Yet, that won't fix your code specifically, as you are awaiting a specific event in sub_2 and I suppose you plan to do the same for sub_1. Awaiting a specific event with a skipping subscriber is obviously a no-go. Now, was your code something like:

loop {
    if let Ok(scan_result) = wifi_driver.get_scan_result() {
        // ... process here
        break;
    }

    sub_2.recv().await;
}

... that would work fine with skipping subscribers. And by the way, you can easily implement this pattern by combining an Arc<Signal> (or even an Arc<Notification> from channel-bridge) with the non-async callback-based subscribe method of the event loop.

Oh well. Who says that async is easy?

ivmarkov avatar Dec 20 '22 18:12 ivmarkov

Thanks for your great answer. Now I understand the system behind and can deal with these problems better.

In your case, the offending consumer is sub_1, as you are never, ever pumping messages from it.

I checked my problem with receiving from sub_1 and it blocks as well.

Yet, that won't fix your code specifically, as you are awaiting a specific event in sub_2 and I suppose you plan to do the same for sub_1. Awaiting a specific event with a skipping subscriber is obviously a no-go

You said it's a no go waiting on a specific event. But I receive all events and discard these which not matches to my event because I receive them in a while loop and this is looping as long I get the correct event.

But I didn't get why it blocks at subscribing. Or do you mean there is also a queue on witch subscribers are pushed and this is limited to n subscribers per event?

Oh well. Who says that async is easy?

That's true. At some time you think you understood everything and everything works, but then you get to the point you understand nothing.

teamplayer3 avatar Dec 20 '22 20:12 teamplayer3

(UPDATE: Edited the code for extra clarity.)

I checked my problem with receiving from sub_1 and it blocks as well.

Mind sharing how exactly you fixed that?

You said it's a no go waiting on a specific event. But I receive all events and discard these which not matches to my event because I receive them in a while loop and this is looping as long I get the correct event.

My point is - again - in the presence of skipping subscribers (which you don't have in your code), you can't rely on waiting for a specific event, because - due to timing issues - that event might get skipped and you might never see it!

But I didn't get why it blocks at subscribing. Or do you mean there is also a queue on witch subscribers are pushed and this is limited to n subscribers per event?

It blocks on subscribing because the subscribing code wants to get the queue lock. However - and in the presence of outstanding events not consumed yet by some consumer - this lock is held by the system which waits on it until the consumer consumes the event. However, since you do the subscription and the consumption of your events from the same thread (= you are trying to subscribe instead of consuming events), the queue remains locked.

That's true. At some time you think you understood everything and everything works, but then you get to the point you understand nothing.

Persistence usually helps. :)

If you share your code, I might be able to help. But then again and in general - the safest is to just use skipping subscribers. The easiest way to simulate these is with a Signal or with a Notification which is armed from the event bus callback (and which - when arming - potentially overwrites its previous value). But then again - forget about assuming that you would be able to see ALL events by awaiting on a channel.

On the other hand, I can't stress how useful Signal actually is. My RUWM project is 99.9% done with Signal and its weaker variant - Notification. No blocking, no issues. And - you can almost always - using extra knowledge from your concrete use case - workaround the fact that Signal/Notification overwrite. For example, you can use a simple let my_notif = Arc::new(Notification::new()) and then - in the callback - arm the notification only if the received event is ScanDone. As in:

let my_notif = Arc::new(Notification::new());
let sub_2 = {
    let my_notif = my_notif.clone();

    event_bus.subscribe(move |event|) {
        if matches!(event, WifiEvent::ScanDone) {
            my_notif.notify();
        }
    })?
};

// Note that with the logic below you have to subscribe _before_ `scan_start` is called
// Note also that `my_notif.notify()` is a) non-blocking and completes immediately and b) as I described - it will "ovewrite" the previous value; but in your use case that does not matter as you are not expecting multiple "scan done" events anyway 
wifi_driver.scan_start();

// Wait for scan to be done
my_notif.wait().await;

ivmarkov avatar Dec 20 '22 20:12 ivmarkov

Thanks again, for clarification.

Mind sharing how exactly you fixed that?

I recv() the events in a extra task to empty the queue.

I will try it with signal/notification as it is approved by you.

There are so many rust sync primitives out there, it needs some time to learn them all and which are best to use in which context. Often I prefer to implement raw futures but they are not 100% tested for every case.

teamplayer3 avatar Dec 20 '22 21:12 teamplayer3

Coming back to the AsyncEventBus. It must work if I only subscribe to it for the period I need the events for?

teamplayer3 avatar Dec 20 '22 21:12 teamplayer3

Summarizing what @ivmarkov suggested, the problem is that sub_1 already consumed the event. Therefore one of the best possible workaround right now is to add a message broker/multiplexer to "clone" the event to send to multiple handlers. However another issue is that things like esp_wifi_scan_get_ap_records is also one-shot and will not be available once again for any of the handlers other than the first one who collected the WIFI scan records.

This behavior is similar to std::future::Future as futures are also supposed to be one-shot, that once if it was Ready you should not poll the future again.

Taking parallel from this I think we can employ a similar tactic when we need to handle multiple futures call: fuse it. This however implies heap memory buffers are definitely needed. This will make the firmware less deterministic.

stevefan1999-personal avatar Jan 14 '23 10:01 stevefan1999-personal