Rumqttd: Broker threads panicking under heavy load
How to reproduce:
- create broker using rumqttd-0.19.0
- Set max_connections to some number, lets say 7000
- Bombard with mqttx with higher amount, lets say 7001 clients.
I downloaded MQTTX from the site: https://mqttx.app/ Then I used following command:
./mqttx-cli-linux-x64 bench conn -V 3.1.1 -c 7001 -i 1
Console starts to fill with thread panic errors.
Code used to start the broker:
pub fn start_broker(app_config :ApplicationConfig){
let port = app_config.mqtt_port;
let router = RouterConfig{
max_connections: 7000,
max_outgoing_packet_count: 200,
max_segment_size: 104857600,
max_segment_count: 10,
..Default::default()
};
let connections = ConnectionSettings{
connection_timeout_ms: 5000,
max_payload_size: MQTT_PACKET_MAX_SIZE,
max_inflight_count: 10000,
auth: None,
external_auth: Some(Arc::new( move |client_id, username, password| {
authenticate_mqtt_client(client_id, username, password)
})),
dynamic_filters: false,
};
let host;
let tls;
if port == 1883 || cfg!(debug_assertions) {
host = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port);
tls = None;
}
else{
host = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port);
tls = Some(TlsConfig::Rustls {
capath: None,
certpath: app_config.tls_cert_string,
keypath: app_config.tls_key_string
});
}
let server = ServerSettings{
name: "my_mqtt_broker".to_string(),
listen: SocketAddr::V4(host),
tls,
next_connection_delay_ms: 1,
connections,
};
let server_map = HashMap::from([
(String::from("v4"), server),
]);
let config: Config = Config{
id: 1,
router,
v4: Some(server_map),
..Default::default()
};
let mut broker = Broker::new(config);
tokio::spawn(async move{
log::info!("MQTT Broker running in {host}");
broker.start().unwrap()
});
}
Relevant Environment variables: RUST_LOG=info,rumqttd
I debugged where the errors come from, and traced them to function named remote in rumqttd/src/server/broker.rs Unwrapping unchecked mutexes in will handlers cause the issues and i tested fixing them, and got it working. The problem is that the mutex is locked before the unwrap and panic happens, so the resource is locked eternally at that point, and no new connections can be made. Existing connections still live.
Heres link to my changes: https://github.com/bytebeamio/rumqtt/compare/rumqttd-0.19.0...jarvjani:rumqtt:tread_panic_fix
This is really great library, i am hoping we get v5 broker at some point! Let me know how i can help, so we get relevant changes merged.