zenoh
zenoh copied to clipboard
Sending fails to stop properly after the session is closed. ERROR: Route data with unknown scope
Describe the bug
2024-05-24T09:16:37.076191Z INFO main ThreadId(01) zenoh::net::runtime: Using ZID: c8fb696718026344604f9b074a3ff1e7
[ping] Started measuring for 5(s)
[ping] RTT(us) p05: 93393, p50: 769794, p95: 1409960, avg: 763211.63
[ping] Recv rate 520142/705828 = 73.69% messages
[ping] Done
2024-05-24T09:16:42.795357Z ERROR rx-0 ThreadId(07) zenoh::handlers: sending on a closed channel
2024-05-24T09:16:42.797147Z ERROR rx-0 ThreadId(07) zenoh::net::routing::dispatcher::pubsub: Route data with unknown scope 2!
2024-05-24T09:16:42.797181Z ERROR rx-0 ThreadId(07) zenoh::net::routing::dispatcher::pubsub: Route data with unknown scope 2!
2024-05-24T09:16:42.797188Z ERROR rx-0 ThreadId(07) zenoh::net::routing::dispatcher::pubsub: Route data with unknown scope 2!
...
2024-05-24T09:16:42.798251Z ERROR rx-0 ThreadId(07) zenoh::net::routing::dispatcher::pubsub: Route data with unknown scope 2!
2024-05-24T09:16:42.798258Z ERROR rx-0 ThreadId(07) zenoh::net::routing::dispatcher::pubsub: Route data with unknown scope 2!
2024-05-24T09:16:42.798264Z ERROR rx-0 ThreadId(07) zenoh::net::routing::dispatcher::pubsub: Route data with unknown scope 2!
2024-05-24T09:16:42.799943Z WARN net-0 ThreadId(05) zenoh::net::runtime::orchestrator: Unable to connect to tcp/127.0.0.1:7447! Received a close message (reason MAX_LINKS) in response to an OpenSyn on: TransportLinkUnicast { link: Link { src: tcp/127.0.0.1:44314, dst: tcp/127.0.0.1:7447, mtu: 65535, is_reliable: true, is_streamed: true }, config: TransportLinkUnicastConfig { direction: Outbound, batch: BatchConfig { mtu: 65535, is_streamed: true, is_compression: false } } } at io/zenoh-transport/src/unicast/establishment/open.rs:444.
2024-05-24T09:16:42.800186Z ERROR net-0 ThreadId(05) zenoh::net::runtime::orchestrator: [] Unable to connect to any of [tcp/127.0.0.1:7447]! at zenoh/src/net/runtime/orchestrator.rs:278.
To reproduce
- Add
chrono
cargo add chrono
- Put the following file in examples and name it as z_ping_parallel.
use clap::Parser;
use std::sync::{
atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
Arc,
};
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
use zenoh::publication::CongestionControl;
use zenoh_examples::CommonArgs;
#[tokio::main]
async fn main() {
// initiate logging
zenoh_util::try_init_log_from_env();
let (config, timeout, size) = parse_args();
let session = zenoh::open(config).res().await.unwrap().into_arc();
// The key expression to publish data on
let key_expr_ping = keyexpr::new("test/ping").unwrap();
// The key expression to wait the response back
let key_expr_pong = keyexpr::new("test/pong").unwrap();
let is_running = Arc::new(AtomicBool::new(true));
let counter = Arc::new(AtomicUsize::new(0));
let publisher = session
.declare_publisher(key_expr_ping)
.congestion_control(CongestionControl::Block)
.res()
.await
.unwrap();
tokio::task::spawn({
let is_running = is_running.clone();
let counter = counter.clone();
async move {
while is_running.load(SeqCst) {
let start = chrono::offset::Utc::now().timestamp_nanos_opt().unwrap();
let mut payload = vec![1u8; size];
payload[..8].clone_from_slice(&start.to_le_bytes());
publisher.put(payload.to_vec()).res().await.unwrap();
counter.fetch_add(1, SeqCst);
// std::thread::sleep(std::time::Duration::from_millis(50));
}
}
});
println!("[ping] Started measuring for {}(s)", timeout.as_secs());
let sub = session.declare_subscriber(key_expr_pong).res().await.unwrap();
let timer = std::time::Instant::now();
let mut samples = vec![];
while let Ok(sample) = sub.recv_async().await {
let payload = sample.value.payload.contiguous();
let start = i64::from_le_bytes(payload[..8].try_into().unwrap());
let end = chrono::offset::Utc::now().timestamp_nanos_opt().unwrap();
let elpased = (end - start) / 1000;
samples.push(elpased);
if timer.elapsed() > timeout {
break;
}
}
is_running.swap(false, SeqCst);
samples.sort();
let sum: i64 = samples.iter().sum();
println!(
"[ping] RTT(us) p05: {}, p50: {}, p95: {}, avg: {:.02}",
samples[(samples.len() as f64 * 0.05) as usize],
samples[(samples.len() as f64 * 0.50) as usize],
samples[(samples.len() as f64 * 0.95) as usize],
sum as f64 / samples.len() as f64,
);
let sent = counter.load(SeqCst);
let recv = samples.len();
let rate = recv as f64 / sent as f64 * 100.0;
println!("[ping] Recv rate {recv}/{sent} = {rate:.02}% messages");
println!("[ping] Done");
}
#[derive(Parser)]
struct Args {
#[arg(short, long, default_value = "10")]
/// The number of seconds to warm up (u64)
timeout: u64,
#[arg(short, long, default_value = "32")]
/// Sets the size of the payload to publish
payload_size: usize,
#[command(flatten)]
common: CommonArgs,
}
fn parse_args() -> (Config, Duration, usize) {
let args = Args::parse();
(
args.common.into(),
Duration::from_secs(args.timeout),
args.payload_size,
)
}
- Execute the test
cargo run --example z_pong -- --listen tcp/127.0.0.1:7447
cargo run --example z_ping_parallel -- --mode client --connect tcp/127.0.0.1:7447 --timeout 5
System info
- zenoh: 25f06bd