rumqtt
rumqtt copied to clipboard
Non-blocking receive never notifies
Expected Behavior
Expected case 1, 2 and 3 to print the same notifications.
Current Behavior
Only case 1 prints notifications, case 2 and 3 show no notifications.
Context
Trying to integrate non-blocking receive into an existing event loop. But try_recv() and recv_timeout() never generate meaning full notifications. recv() does generate notifications, but is blocking.
use rumqttc::{Client, MqttOptions, QoS, RecvTimeoutError, TryRecvError};
use std::time::Duration;
const CASE: u8 = 1;
fn main() {
let mut mqttoptions = MqttOptions::new("rumqtt-sync", "test.mosquitto.org", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));
let (client, mut connection) = Client::new(mqttoptions, 10);
client.subscribe("hello/rumqtt", QoS::AtMostOnce).unwrap();
loop {
// My own event loop
match CASE {
1 => {
// Output:
// Notification = Ok(Ok(Incoming(ConnAck(ConnAck { session_present: false, code: Success }))))
// Notification = Ok(Ok(Outgoing(Subscribe(1))))
// Notification = Ok(Ok(Incoming(SubAck(SubAck { pkid: 1, return_codes: [Success(AtMostOnce)] }))))
// Notification = Ok(Ok(Incoming(Publish(Topic = hello/rumqtt, Qos = AtMostOnce, Retain = true, Pkid = 0, Payload Size = 13))))
let notification = connection.recv();
println!("Notification = {:?}", notification);
}
2 => {
// No output
let notification = connection.try_recv();
if !matches!(notification, Err(TryRecvError::Empty)) {
println!("Notification = {:?}", notification);
}
std::thread::sleep(Duration::from_millis(1));
}
3 => {
// No output
let notification = connection.recv_timeout(Duration::from_millis(1));
if !matches!(notification, Err(RecvTimeoutError::Timeout)) {
println!("Notification = {:?}", notification);
}
}
_ => (),
}
}
}
[dependencies]
rumqttc = "0.24.0"
uname -a
Linux 5.10.16.3-microsoft-standard-WSL2 #1 SMP Fri Apr 2 22:23:49 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux
hey, thanks for reporting.
in case 3, you can increase the timeout duration, e.g. connection.recv_timeout(Duration::from_secs(1)), it will work.
for case 2, i will get back to you!
Requests take a while to be processed, so both try_recv and recv_timeout(1 ms) will fail to respond in time and hence the connection is never established, this is unfortunate, but it seems like atleast in the case of sync code, we should deprecate try_recv and put a note in place to deter use of extremely small timeouts for the other.
We require a sync specific EventLoop, but that is clearly not on priority, so that's upto anyone interested to contribute.
From a users perspective the timeout in recv_timeout() should apply to reception of notification, and be decoupled from timeouts of connections that are managed inside the stack.
If try_recv() gets removed, I would reach for recv_timeout(0ms) which would still be wrong. Adding a note would be helpful, but what would be an acceptable, but small timeout? It all seems a bit finicky, and leads to surprising behavior.
For now I moved the connection to a separate thread, and post notification into a queue that can be read non-blocking from my own event loop. This works without issues so far.
let (mut client, mut connection) = Client::new(mqttoptions, 100);
let (send_event, recv_event)= std::sync::mpsc::channel();
std::thread::spawn(move || {
for notification in connection.iter() {
send_event.send(notification).unwrap();
}
});
...
loop {
if let Ok(notification) = recv_event.try_recv() {
println!("Notification = {:?}", notification);
}
}
Maybe it is possible to do something similar in the Connection implementation. This could fix to the issue, without deprecation.