rumqtt icon indicating copy to clipboard operation
rumqtt copied to clipboard

Recv_timeout does not seem to be working

Open SirVer opened this issue 3 years ago • 1 comments

calling the newly added recv_timeout gives me the following error:

thread 'main' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/context.rs:54:26

Using try_recv instead gives the same error.

SirVer avatar Sep 26 '22 19:09 SirVer

This is the code I am using:

'outer: loop {
        // To make our client name unique, we append the current time.
        let now = Utc::now().to_rfc3339();
        let mut mqttoptions =
            MqttOptions::new(format!("mqtt_logger_{now}"), &args.hostname, args.port);
        mqttoptions.set_keep_alive(Duration::from_secs(60));

        let (mut client, mut connection) = Client::new(mqttoptions, 10);
        for t in &args.topics {
            client.subscribe(t, QoS::ExactlyOnce)?;
        }

        use rumqttc::Event::*;
        use rumqttc::Packet::*;
        loop {
            loop {
                let notification = match connection.recv_timeout(MIN_REACTION_TIME) {
                    Ok(rv) => rv,
                    Err(RecvTimeoutError::Timeout) => break,
                    Err(RecvTimeoutError::Disconnected) => continue 'outer,
                };

                let n = match notification {
                    Ok(n) => n,
                    Err(e) => {
                        eprintln!("MQTT error. Will reconnect: {e}.");
                        continue 'outer;
                    }
                };

                let res = match n {
                    Incoming(Publish(data)) => {
                        handle_message(&data.topic, &data.payload, &mut logfile, args.quiet)
                    }
                    Incoming(_) | Outgoing(_) => Ok(()),
                };

                if let Err(e) = res {
                    eprintln!("error in handle_message. Will reconnect: {e}.");
                    continue 'outer;
                }
            }
        }

SirVer avatar Sep 26 '22 19:09 SirVer

I am not sure what exactly triggered this, but I believe the problem stems from an error here while trying to build the runtime on the same thread where one was already built in a previous iteration?

Maybe I don't make much sense but this issue seems to be pretty complicated for me to even comprehend, frankly.

de-sh avatar Sep 27 '22 18:09 de-sh

@de-sh Can you reproduce this?

SirVer avatar Oct 02 '22 12:10 SirVer

@SirVer I'll take a look at this soon. Can you give me a small (runnable) example that can reproduce this?

tekjar avatar Oct 02 '22 14:10 tekjar

@tekjar sorry, I did somehow miss this request. I'll gobble something together.

SirVer avatar Oct 25 '22 18:10 SirVer

@tekjar Here is a gist containing a Cargo.toml using the latest master of this repo and a small main.rs that, when compiled and executes gives the following backtrace for me.

$ target/debug/mqtt_logger --hostname localhost
thread 'main' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /Users/hrapp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/context.rs:54:26
stack backtrace:
   0: rust_begin_unwind
             at /rustc/a55dd71d5fb0ec5a6a3a9e8c27b2127ba491ce52/library/std/src/panicking.rs:584:5
   1: core::panicking::panic_fmt
             at /rustc/a55dd71d5fb0ec5a6a3a9e8c27b2127ba491ce52/library/core/src/panicking.rs:142:14
   2: core::panicking::panic_display
             at /rustc/a55dd71d5fb0ec5a6a3a9e8c27b2127ba491ce52/library/core/src/panicking.rs:72:5
   3: core::panicking::panic_str
             at /rustc/a55dd71d5fb0ec5a6a3a9e8c27b2127ba491ce52/library/core/src/panicking.rs:56:5
   4: core::option::expect_failed
             at /rustc/a55dd71d5fb0ec5a6a3a9e8c27b2127ba491ce52/library/core/src/option.rs:1874:5
   5: core::option::Option<T>::expect
             at /rustc/a55dd71d5fb0ec5a6a3a9e8c27b2127ba491ce52/library/core/src/option.rs:738:21
   6: tokio::runtime::context::time_handle::{{closure}}
             at /Users/hrapp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/context.rs:54:13
   7: std::thread::local::LocalKey<T>::try_with
             at /rustc/a55dd71d5fb0ec5a6a3a9e8c27b2127ba491ce52/library/std/src/thread/local.rs:445:16
   8: tokio::runtime::context::time_handle
             at /Users/hrapp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/context.rs:52:15
   9: tokio::time::driver::handle::Handle::current
             at /Users/hrapp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/time/driver/handle.rs:57:13
  10: tokio::time::driver::sleep::Sleep::new_timeout
             at /Users/hrapp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/time/driver/sleep.rs:257:22
  11: tokio::time::timeout::timeout
             at /Users/hrapp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/time/timeout.rs:79:27
  12: rumqttc::client::Connection::recv_timeout
             at /Users/hrapp/.cargo/git/checkouts/rumqtt-5c5ad7136a67b3df/29f5b1e/rumqttc/src/client.rs:430:17
  13: mqtt_logger::main
             at ./mqtt_logger/src/main.rs:23:9
  14: core::ops::function::FnOnce::call_once
             at /rustc/a55dd71d5fb0ec5a6a3a9e8c27b2127ba491ce52/library/core/src/ops/function.rs:248:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

Sorry again for the delay in providing necessary information.

SirVer avatar Oct 25 '22 19:10 SirVer

Hey, entering runtime context shall fix this, you can verify by using fix-recv-timeout branch

rumqttc = { default-features = false, git = "https://github.com/bytebeamio/rumqtt", branch = "fix-recv-timeout" }

swanandx avatar Oct 29 '22 17:10 swanandx

@swanandx Tested and works now as advertised. Thanks for the fix!

SirVer avatar Oct 31 '22 09:10 SirVer