zenoh icon indicating copy to clipboard operation
zenoh copied to clipboard

Sending fails to stop properly after the session is closed. ERROR: Route data with unknown scope

Open YuanYuYuan opened this issue 9 months ago • 0 comments

Describe the bug

2024-05-24T09:16:37.076191Z  INFO main ThreadId(01) zenoh::net::runtime: Using ZID: c8fb696718026344604f9b074a3ff1e7
[ping] Started measuring for 5(s)
[ping] RTT(us) p05: 93393, p50: 769794, p95: 1409960, avg: 763211.63
[ping] Recv rate 520142/705828 = 73.69% messages
[ping] Done
2024-05-24T09:16:42.795357Z ERROR rx-0 ThreadId(07) zenoh::handlers: sending on a closed channel
2024-05-24T09:16:42.797147Z ERROR rx-0 ThreadId(07) zenoh::net::routing::dispatcher::pubsub: Route data with unknown scope 2!
2024-05-24T09:16:42.797181Z ERROR rx-0 ThreadId(07) zenoh::net::routing::dispatcher::pubsub: Route data with unknown scope 2!
2024-05-24T09:16:42.797188Z ERROR rx-0 ThreadId(07) zenoh::net::routing::dispatcher::pubsub: Route data with unknown scope 2!
...
2024-05-24T09:16:42.798251Z ERROR rx-0 ThreadId(07) zenoh::net::routing::dispatcher::pubsub: Route data with unknown scope 2!
2024-05-24T09:16:42.798258Z ERROR rx-0 ThreadId(07) zenoh::net::routing::dispatcher::pubsub: Route data with unknown scope 2!
2024-05-24T09:16:42.798264Z ERROR rx-0 ThreadId(07) zenoh::net::routing::dispatcher::pubsub: Route data with unknown scope 2!
2024-05-24T09:16:42.799943Z  WARN net-0 ThreadId(05) zenoh::net::runtime::orchestrator: Unable to connect to tcp/127.0.0.1:7447! Received a close message (reason MAX_LINKS) in response to an OpenSyn on: TransportLinkUnicast { link: Link { src: tcp/127.0.0.1:44314, dst: tcp/127.0.0.1:7447, mtu: 65535, is_reliable: true, is_streamed: true }, config: TransportLinkUnicastConfig { direction: Outbound, batch: BatchConfig { mtu: 65535, is_streamed: true, is_compression: false } } } at io/zenoh-transport/src/unicast/establishment/open.rs:444.
2024-05-24T09:16:42.800186Z ERROR net-0 ThreadId(05) zenoh::net::runtime::orchestrator: [] Unable to connect to any of [tcp/127.0.0.1:7447]!  at zenoh/src/net/runtime/orchestrator.rs:278.

To reproduce

  1. Add chrono
cargo add chrono
  1. Put the following file in examples and name it as z_ping_parallel.
use clap::Parser;
use std::sync::{
    atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
    Arc,
};
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
use zenoh::publication::CongestionControl;
use zenoh_examples::CommonArgs;


#[tokio::main]
async fn main() {
    // initiate logging
    zenoh_util::try_init_log_from_env();

    let (config, timeout, size) = parse_args();
    let session = zenoh::open(config).res().await.unwrap().into_arc();

    // The key expression to publish data on
    let key_expr_ping = keyexpr::new("test/ping").unwrap();

    // The key expression to wait the response back
    let key_expr_pong = keyexpr::new("test/pong").unwrap();


    let is_running = Arc::new(AtomicBool::new(true));
    let counter = Arc::new(AtomicUsize::new(0));
    let publisher = session
        .declare_publisher(key_expr_ping)
        .congestion_control(CongestionControl::Block)
        .res()
        .await
        .unwrap();
    tokio::task::spawn({
        let is_running = is_running.clone();
        let counter = counter.clone();
        async move {
            while is_running.load(SeqCst) {
                let start = chrono::offset::Utc::now().timestamp_nanos_opt().unwrap();
                let mut payload = vec![1u8; size];
                payload[..8].clone_from_slice(&start.to_le_bytes());
                publisher.put(payload.to_vec()).res().await.unwrap();
                counter.fetch_add(1, SeqCst);
                // std::thread::sleep(std::time::Duration::from_millis(50));
            }
        }
    });

    println!("[ping] Started measuring for {}(s)", timeout.as_secs());
    let sub = session.declare_subscriber(key_expr_pong).res().await.unwrap();
    let timer = std::time::Instant::now();
    let mut samples = vec![];
    while let Ok(sample) = sub.recv_async().await {
        let payload = sample.value.payload.contiguous();
        let start = i64::from_le_bytes(payload[..8].try_into().unwrap());
        let end = chrono::offset::Utc::now().timestamp_nanos_opt().unwrap();
        let elpased = (end - start) / 1000;
        samples.push(elpased);
        if timer.elapsed() > timeout {
            break;
        }
    }

    is_running.swap(false, SeqCst);
    samples.sort();
    let sum: i64 = samples.iter().sum();
    println!(
        "[ping] RTT(us) p05: {}, p50: {}, p95: {}, avg: {:.02}",
        samples[(samples.len() as f64 * 0.05) as usize],
        samples[(samples.len() as f64 * 0.50) as usize],
        samples[(samples.len() as f64 * 0.95) as usize],
        sum as f64 / samples.len() as f64,
    );

    let sent = counter.load(SeqCst);
    let recv = samples.len();
    let rate = recv as f64 / sent as f64 * 100.0;
    println!("[ping] Recv rate {recv}/{sent} = {rate:.02}% messages");

    println!("[ping] Done");

}

#[derive(Parser)]
struct Args {
    #[arg(short, long, default_value = "10")]
    /// The number of seconds to warm up (u64)
    timeout: u64,
    #[arg(short, long, default_value = "32")]
    /// Sets the size of the payload to publish
    payload_size: usize,
    #[command(flatten)]
    common: CommonArgs,
}

fn parse_args() -> (Config, Duration, usize) {
    let args = Args::parse();
    (
        args.common.into(),
        Duration::from_secs(args.timeout),
        args.payload_size,
    )
}
  1. Execute the test
cargo run --example z_pong -- --listen tcp/127.0.0.1:7447
cargo run --example z_ping_parallel -- --mode client --connect tcp/127.0.0.1:7447 --timeout 5

System info

  • zenoh: 25f06bd

YuanYuYuan avatar May 24 '24 09:05 YuanYuYuan