rumqtt
rumqtt copied to clipboard
Recv_timeout does not seem to be working
calling the newly added recv_timeout gives me the following error:
thread 'main' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/context.rs:54:26
Using try_recv instead gives the same error.
This is the code I am using:
'outer: loop {
// To make our client name unique, we append the current time.
let now = Utc::now().to_rfc3339();
let mut mqttoptions =
MqttOptions::new(format!("mqtt_logger_{now}"), &args.hostname, args.port);
mqttoptions.set_keep_alive(Duration::from_secs(60));
let (mut client, mut connection) = Client::new(mqttoptions, 10);
for t in &args.topics {
client.subscribe(t, QoS::ExactlyOnce)?;
}
use rumqttc::Event::*;
use rumqttc::Packet::*;
loop {
loop {
let notification = match connection.recv_timeout(MIN_REACTION_TIME) {
Ok(rv) => rv,
Err(RecvTimeoutError::Timeout) => break,
Err(RecvTimeoutError::Disconnected) => continue 'outer,
};
let n = match notification {
Ok(n) => n,
Err(e) => {
eprintln!("MQTT error. Will reconnect: {e}.");
continue 'outer;
}
};
let res = match n {
Incoming(Publish(data)) => {
handle_message(&data.topic, &data.payload, &mut logfile, args.quiet)
}
Incoming(_) | Outgoing(_) => Ok(()),
};
if let Err(e) = res {
eprintln!("error in handle_message. Will reconnect: {e}.");
continue 'outer;
}
}
}
I am not sure what exactly triggered this, but I believe the problem stems from an error here while trying to build the runtime on the same thread where one was already built in a previous iteration?
Maybe I don't make much sense but this issue seems to be pretty complicated for me to even comprehend, frankly.
@de-sh Can you reproduce this?
@SirVer I'll take a look at this soon. Can you give me a small (runnable) example that can reproduce this?
@tekjar sorry, I did somehow miss this request. I'll gobble something together.
@tekjar Here is a gist containing a Cargo.toml using the latest master of this repo and a small main.rs that, when compiled and executes gives the following backtrace for me.
$ target/debug/mqtt_logger --hostname localhost
thread 'main' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /Users/hrapp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/context.rs:54:26
stack backtrace:
0: rust_begin_unwind
at /rustc/a55dd71d5fb0ec5a6a3a9e8c27b2127ba491ce52/library/std/src/panicking.rs:584:5
1: core::panicking::panic_fmt
at /rustc/a55dd71d5fb0ec5a6a3a9e8c27b2127ba491ce52/library/core/src/panicking.rs:142:14
2: core::panicking::panic_display
at /rustc/a55dd71d5fb0ec5a6a3a9e8c27b2127ba491ce52/library/core/src/panicking.rs:72:5
3: core::panicking::panic_str
at /rustc/a55dd71d5fb0ec5a6a3a9e8c27b2127ba491ce52/library/core/src/panicking.rs:56:5
4: core::option::expect_failed
at /rustc/a55dd71d5fb0ec5a6a3a9e8c27b2127ba491ce52/library/core/src/option.rs:1874:5
5: core::option::Option<T>::expect
at /rustc/a55dd71d5fb0ec5a6a3a9e8c27b2127ba491ce52/library/core/src/option.rs:738:21
6: tokio::runtime::context::time_handle::{{closure}}
at /Users/hrapp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/context.rs:54:13
7: std::thread::local::LocalKey<T>::try_with
at /rustc/a55dd71d5fb0ec5a6a3a9e8c27b2127ba491ce52/library/std/src/thread/local.rs:445:16
8: tokio::runtime::context::time_handle
at /Users/hrapp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/context.rs:52:15
9: tokio::time::driver::handle::Handle::current
at /Users/hrapp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/time/driver/handle.rs:57:13
10: tokio::time::driver::sleep::Sleep::new_timeout
at /Users/hrapp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/time/driver/sleep.rs:257:22
11: tokio::time::timeout::timeout
at /Users/hrapp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/time/timeout.rs:79:27
12: rumqttc::client::Connection::recv_timeout
at /Users/hrapp/.cargo/git/checkouts/rumqtt-5c5ad7136a67b3df/29f5b1e/rumqttc/src/client.rs:430:17
13: mqtt_logger::main
at ./mqtt_logger/src/main.rs:23:9
14: core::ops::function::FnOnce::call_once
at /rustc/a55dd71d5fb0ec5a6a3a9e8c27b2127ba491ce52/library/core/src/ops/function.rs:248:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
Sorry again for the delay in providing necessary information.
Hey, entering runtime context shall fix this, you can verify by using fix-recv-timeout branch
rumqttc = { default-features = false, git = "https://github.com/bytebeamio/rumqtt", branch = "fix-recv-timeout" }
@swanandx Tested and works now as advertised. Thanks for the fix!