dbus-rs icon indicating copy to clipboard operation
dbus-rs copied to clipboard

Large messages require multiple `Connection::process()` calls to be received.

Open Toqozz opened this issue 2 years ago • 10 comments

This is a really weird issue.

Using the following main loop (blocking):

fn main() {
    let (sender, receiver) = channel();
    let (mut cr, c) = init_bus(sender);

    // cr.serve(&c); equivalent
    c.start_receive(dbus::message::MatchRule::new_method_call(), Box::new(move |msg, conn| {
        cr.handle_message(msg, conn).unwrap();
        true
    }));

    loop {
        dbg!("loop");

        let _result = c.process(std::time::Duration::from_millis(0));
        if let Ok(notif) = receiver.try_recv() {
            dbg!(notif);
        }

        std::thread::sleep(std::time::Duration::from_millis(1000));
    }
}

For large messages (org.freedesktop.Notifications), this will need mulitple loops to actually receive something. If I only sleep for e.g. 1ms, then messages will be received very quickly.

I'm guessing that messages are chunked in some way and Connection::process() needs to be called multiple times to receive the whole message. I can see that if I remove the sleep and set the process() timeout to 1000ms, it runs many loops as soon as I send a message, so it seems to know that it's trying to process an incomplete message.

Perhaps Connection::process() only processes one message "chunk" at a time? Makes sense. This behaviour would be ok if I can somehow see that messages are pending so that I can spin and process them all. I looked at the Result of c.process(), but there seems to be no indication that messages are waiting. I suspect that the correct way to get this information is somehow through Channel, but I tried a few things and couldn't get a result.

I suspect I could also solve this by just running SyncConnection::process() in a separate thread, but I'm not very familiar with how the Sync traits interract. Is there an equivalent for the following but using a SyncConnection?:

c.start_receive(dbus::message::MatchRule::new_method_call(), Box::new(move |msg, conn| {
    cr.handle_message(msg, conn).unwrap();
    true
}));

Here's a gist for full repro: https://gist.github.com/Toqozz/4c91deb2a38c09a7804d7c6e4c706d0a

You can test by running notify-send "<summary>" where <summary> is really large (>~ 300 bytes).

Toqozz avatar Dec 04 '21 04:12 Toqozz

Hmm...interesting issue. Does something like this help:

c.channel().set_watch_enabled(true);
loop {
    loop {
        c.process(std::time::Duration::from_millis(0)).unwrap();
        if (!c.channel().watch().read) break;
   }
   /* process incoming messages, sleep etc */
}

diwic avatar Dec 06 '21 07:12 diwic

This fails because Channel::set_watch_enabled(true) takes &mut self but Connection::channel() returns &self. I'm not sure how to get a mutable reference to channel without manually creating the whole connection, which doesn't seem feasible.

error[E0596]: cannot borrow data in a `&` reference as mutable
  --> src/main.rs:93:5
   |
93 |     c.channel().set_watch_enabled(true);
   |     ^^^^^^^^^^^ cannot borrow as mutable

I've sort of worked around this by having a global connection variable and doing conn.process() in a separate thread, but I feel like this isn't safe, even though it's only an immutable reference.

static mut DBUS_CONN: Option<Connection> = None;

pub fn init_bus() {
    let c = Connection::new_session().expect("Failed to get a session bus.");
    let reply = c
        .request_name("org.freedesktop.Notifications", false, true, false)
        .expect("Failed to register name.");

    // etc...

    unsafe {
        DBUS_CONN = Some(c);
    }
}

pub fn get_conn() -> &'static Connection {
    unsafe {
        assert(DBUS_CONN.is_some());
        DBUS_CONN.as_ref().unwrap()
    }
}

// later, from thread 2
pub fn process_dbus() {
    let conn = get_conn();
    loop {
        conn.process(Duration::from_millis(1000)).unwrap();
    }
}

// later, from thread 1
pub fn send_message() {
    let conn = get_conn();
    conn.send(<some message>);
}

Toqozz avatar Dec 07 '21 02:12 Toqozz

I'm not sure how to get a mutable reference to channel without manually creating the whole connection, which doesn't seem feasible.

let mut ch = Channel::get_private(Session);
ch.set_watch_enabled(true);
let conn: Connection = ch.into();

I've sort of worked around this by having a global connection variable and doing conn.process() in a separate thread, but I feel like this isn't safe, even though it's only an immutable reference.

You can do this without unsafe with SyncConnection, it's one of the things SyncConnection is for. Just store the SyncConnection in an Arc and share it between threads.

Nevertheless I'm not sure whether I should try to fix this somehow (and if so I'm not sure how) or document it as part of the limitations of the libdbus api. It's definitely not something you would expect...

diwic avatar Dec 07 '21 18:12 diwic

c.channel().set_watch_enabled(true);
loop {
    loop {
        c.process(std::time::Duration::from_millis(0)).unwrap();
        if (!c.channel().watch().read) break;
   }
   /* process incoming messages, sleep etc */
}

It looks like c.channel().watch().read is true regardless of whether a large message is being "processed" or not, so no go here.

You can do this without unsafe with SyncConnection, it's one of the things SyncConnection is for. Just store the SyncConnection in an Arc and share it between threads.

Yeah, I mentioned this previously:

I suspect I could also solve this by just running SyncConnection::process() in a separate thread, but I'm not very familiar with how the Sync traits interract. Is there an equivalent for the following but using a SyncConnection?:

c.start_receive(dbus::message::MatchRule::new_method_call(), Box::new(move |msg, conn| {
   cr.handle_message(msg, conn).unwrap();
   true
}))

I get a super obtuse Rust error when I try the above (c is SyncConnection):

error[E0277]: `(dyn for<'r> FnMut(dbus_crossroads::Context, &'r mut Crossroads) -> Option<dbus_crossroads::Context> + Send + 'static)` cannot be shared between threads safely
  --> src/main.rs:87:66
   |
87 |       c.start_receive(dbus::message::MatchRule::new_method_call(), Box::new(move |msg, conn| {
   |  __________________________________________________________________^
88 | |         cr.handle_message(msg, conn).unwrap();
89 | |         true
90 | |     }));
   | |______^ `(dyn for<'r> FnMut(dbus_crossroads::Context, &'r mut Crossroads) -> Option<dbus_crossroads::Context> + Send + 'static)` cannot be shared between threads safely
   |
   = help: the trait `Sync` is not implemented for `(dyn for<'r> FnMut(dbus_crossroads::Context, &'r mut Crossroads) -> Option<dbus_crossroads::Context> + Send + 'static)`
   = note: required because of the requirements on the impl of `Sync` for `Unique<(dyn for<'r> FnMut(dbus_crossroads::Context, &'r mut Crossroads) -> Option<dbus_crossroads::Context> + Send + 'static)>`
   = note: required because it appears within the type `Box<(dyn for<'r> FnMut(dbus_crossroads::Context, &'r mut Crossroads) -> Option<dbus_crossroads::Context> + Send + 'static)>`
   = note: required because it appears within the type `dbus_crossroads::ifacedesc::CallbackDbg`
   = note: required because it appears within the type `Option<dbus_crossroads::ifacedesc::CallbackDbg>`
   = note: required because it appears within the type `MethodDesc`
   = note: required because it appears within the type `(Member<'static>, MethodDesc)`
   = note: required because of the requirements on the impl of `Sync` for `hashbrown::raw::RawTable<(Member<'static>, MethodDesc)>`
   = note: required because it appears within the type `hashbrown::map::HashMap<Member<'static>, MethodDesc, RandomState>`
   = note: required because it appears within the type `HashMap<Member<'static>, MethodDesc>`
   = note: required because it appears within the type `dbus_crossroads::ifacedesc::IfaceDesc`
   = note: required because of the requirements on the impl of `Sync` for `Unique<dbus_crossroads::ifacedesc::IfaceDesc>`
   = note: required because it appears within the type `alloc::raw_vec::RawVec<dbus_crossroads::ifacedesc::IfaceDesc>`
   = note: required because it appears within the type `Vec<dbus_crossroads::ifacedesc::IfaceDesc>`
   = note: required because it appears within the type `dbus_crossroads::ifacedesc::Registry`
   = note: required because it appears within the type `Crossroads`
   = note: required because it appears within the type `[closure@src/main.rs:87:75: 90:6]`
   = note: required for the cast to the object type `dyn for<'r> FnMut(Message, &'r SyncConnection) -> bool + Send + Sync`

I can do it by putting cr in a Mutex, but this seems to defeat the purpose of having a SyncConnection in the first place.


I apologise for my ignorance here, my knowledge here is seriously lacking.

Toqozz avatar Dec 07 '21 23:12 Toqozz

I can do it by putting cr in a Mutex

Yep, there is no SyncCrossroads so that's what you need to do.

Let's try something else. What if you had:

c.process(std::time::Duration::from_millis(1))

...instead of from_millis(0), and then sleep for a second between invocations, would that speed up the processing? If so, would it be a suitable workaround for you?

diwic avatar Dec 08 '21 06:12 diwic

Hmm...in dbus-tokio, we have some kind of strange loop which calls libc::recv. Maybe that's something that's needed here as well?

https://github.com/diwic/dbus-rs/blob/master/dbus-tokio/src/connection.rs#L170

diwic avatar Dec 08 '21 07:12 diwic

Let's try something else. What if you had:

c.process(std::time::Duration::from_millis(1))

...instead of from_millis(0), and then sleep for a second between invocations, would that speed up the processing? If so, would it be a suitable workaround for you?

The issue happens whether it's c.process(0) or c.process(1000). It seems like c.process() only processes a single "chunk" on each invocation, so this would still have the issue.

Hmm...in dbus-tokio, we have some kind of strange loop which calls libc::recv. Maybe that's something that's needed here as well?

https://github.com/diwic/dbus-rs/blob/master/dbus-tokio/src/connection.rs#L170

The issue linked in the comment above that code (#254) looks extremely familiar to this one... so seems like it.

Toqozz avatar Dec 08 '21 08:12 Toqozz

So if process(1000) would not receive a message > 2048 bytes immediately, that seems even weirder; I'd call it a relatively serious bug in libdbus if that's the case, and it should be reported there and discussed. Dbus-rs has been around for a really long time and libdbus for far longer, so can you double check before we go upstream with it?

diwic avatar Dec 10 '21 07:12 diwic

I can confirm this is the case. I don't think it's a timing issue though and seems like it could be by design (though I can't see any mention on the behavior in the dbus documentation. I think c.process() (and maybe dbus_connection_pop_message() just always receives the first fixed chunk no matter the timeout.

loop {
    dbg!("loop");
    c.process(Duration::from_millis(1000)).unwrap();
    if let Ok(notif) = receiver.try_recv() {
        dbg!(notif);
    }

    std::thread::sleep(std::time::Duration::from_millis(100))
}

https://user-images.githubusercontent.com/6947819/145648428-203154f2-b318-46df-b336-c57d5a1fd317.mp4

(my top terminal breaks displaying the message because it is so large)

Toqozz avatar Dec 10 '21 22:12 Toqozz

Filed https://gitlab.freedesktop.org/dbus/dbus/-/issues/364 - let's see what they say.

diwic avatar Dec 11 '21 09:12 diwic