rumqtt
rumqtt copied to clipboard
rumqttd: disconnect notification does not fire for non-graceful disconnects.
Expected Behavior
When the OS reports a socket is disconnected, the link/broker would report the notification.
Current Behavior
No event is published signaling the disconnect.
Context
id = 0
[router]
id = 0
max_connections = 10010
max_outgoing_packet_count = 200
max_segment_size = 104857600
max_segment_count = 10
[v4.1]
name = "v4-1"
listen = "0.0.0.0:1337"
next_connection_delay_ms = 1
[v4.1.connections]
connection_timeout_ms = 60000
max_payload_size = 20480
max_inflight_count = 100
dynamic_filters = true
let mut config = load_cfg();
if let Some(ref mut v4) = config.v4 {
if let Some(settings) = v4.get_mut("1") {
settings.connections.external_auth = Some(Arc::new(authorize_client));
}
}
let mut broker = Broker::new(config);
let (mut link_tx, mut link_rx) = broker.link("singlenode").unwrap();
thread::spawn(move || {
broker.start().unwrap();
});
link_tx.subscribe("#").unwrap();
loop {
let notification = match link_rx.recv().unwrap() {
Some(v) => v,
None => continue,
};
match notification {
Notification::Forward(forward) => {
println!(
"Topic = {:?}, Payload = {} bytes",
forward.publish.topic,
forward.publish.payload.len()
);
}
Notification::Disconnect(d, props) => {
println!("========== disconnect: \n{d:#?}\n{props:#?}");
}
v => {
println!("{v:?}");
}
}
}
Connect client. Remove internet capabilities from client. Eventually, when the OS reports the socket as disconnected, tracing picks up, but no notification for the disconnect is sent. Only this in logs:
ERROR rumqttd::server::broker: Disconnected!!, error: Network(Io(Os { code: 110, kind: TimedOut, message: "Connection timed out" }))
To shorten the default OS timeout to 1sec with 3 attempts, I pulled in libc to test with this:
let (stream, addr) = match listener.accept().await {
Ok((s, r)) => {
let fd = s.as_raw_fd();
let optval: i32 = 1;
let optlen = mem::size_of::<i32>() as libc::socklen_t;
let _ = unsafe {
libc::setsockopt(
fd,
libc::SOL_SOCKET,
libc::SO_KEEPALIVE,
&1 as *const _ as *const libc::c_void,
optlen,
)
};
let _ = unsafe {
libc::setsockopt(
fd,
libc::IPPROTO_TCP,
libc::TCP_KEEPIDLE,
&1 as *const _ as *const libc::c_void,
optlen,
)
};
let _ = unsafe {
libc::setsockopt(
fd,
libc::IPPROTO_TCP,
libc::TCP_KEEPCNT,
&3 as *const _ as *const libc::c_void,
optlen,
)
};
let _ = unsafe {
libc::setsockopt(
fd,
libc::IPPROTO_TCP,
libc::TCP_KEEPINTVL,
&1 as *const _ as *const libc::c_void,
optlen,
)
};
(s, r)
}
Err(e) => {
error!(error=?e, "Unable to accept socket.");
continue;
}
};
- Operating System: Linux
- Toolchain version: rustc 1.74.1 (a28077b28 2023-12-04)
Notification::Disconnect
was introduced in https://github.com/bytebeamio/rumqtt/pull/616 so that we can send Disconnect
packets to v5 clients ( which was the best way I could think of at that time )
Guess my time has come to figure out better way to send v5 disconnect packets and update/remove the Notification::Disconnect(_,_)
haha.
PS: Notification
is used for sending notifications from router to connection. You can think of it as a local client! So you will only get notification if the link itself is disconnected, not from disconnection of other clients!
Ah yeah, it is critical for our end to know as fast as possible when a client is no longer connected.