Add integration examples with other libraries and runtimes.
One of the things, is missing and needs to have is, how are we going to integrate this with other libraries and runtimes like TOKIO, actix, or async-std.
We shall start adding these soon.
Thank you for this library!! I have made a simple example using Tokio, can you take a look?
use mqttrs::{
Connect, ConnectReturnCode, Packet, Pid, Protocol, Publish, QoS, QosPid, Subscribe,
SubscribeTopic, decode_slice, encode_slice,
};
use std::time::Duration;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
sync::mpsc,
};
#[derive(Clone, Debug)]
pub struct MqttOptions {
pub(crate) client: String,
pub(crate) host: String,
pub(crate) port: u16,
pub(crate) username: Option<String>,
pub(crate) password: Option<String>,
pub(crate) keep_alive: u16
}
#[derive(Debug, Clone)]
pub struct MqttListener {
config: MqttOptions,
sender: mpsc::Sender<MqttCommand>,
}
#[derive(Debug)]
enum MqttCommand {
Publish { topic: String, payload: Vec<u8> },
}
impl MqttListener {
pub fn init(config: MqttOptions) -> Self {
let (tx, mut rx) = mpsc::channel::<MqttCommand>(100);
let mqtt = Self {
config: config,
sender: tx.clone(),
};
mqtt.spawn_thread(rx);
mqtt
}
pub fn publish(&self, topic: &str, payload: &[u8]) {
if let Err(e) = self.sender.try_send(MqttCommand::Publish {
topic: topic.to_string(),
payload: payload.to_vec(),
}) {
tracing::error!(target: "mqtt", "unable to send publish command to thread: {}", e);
};
}
fn spawn_thread(&self, mut rx: mpsc::Receiver<MqttCommand>) {
let config = self.config.clone();
tokio::spawn(async move {
let mut backoff = 1;
'reconnect: loop {
tracing::debug!(target: "mqtt", "Connecting to MQTT broker...");
let addr = format!("{}:{}", config.host, config.port);
let stream = match TcpStream::connect(&addr).await {
Ok(s) => {
tracing::debug!(target: "mqtt", "Connected to broker");
backoff = 1; // reset on success
s
}
Err(e) => {
tracing::error!(target: "mqtt", "Connect error: {}", e);
let delay = Duration::from_secs((backoff * 2).min(60));
tracing::debug!(target: "mqtt", "Retrying in {} seconds", delay.as_secs());
tokio::time::sleep(delay).await;
backoff *= 2;
continue 'reconnect;
}
};
let (mut mqtt_read, mut mqtt_write) = stream.into_split();
// CONNECT
let connect_pkt = Packet::Connect(Connect {
protocol: Protocol::MQTT311,
keep_alive: config.keep_alive,
client_id: &config.client,
clean_session: true,
last_will: None,
username: config.username.as_deref(),
password: config.password.as_deref().map(|s| s.as_bytes()),
});
let mut buf = [0u8; 1024];
let len = encode_slice(&connect_pkt, &mut buf).expect("Invalid CONNECT");
if mqtt_write.write_all(&buf[..len]).await.is_err() {
continue 'reconnect;
}
let mut res = [0u8; 1024];
let n = mqtt_read.read(&mut res).await.unwrap_or(0);
if n == 0 {
continue 'reconnect;
}
match decode_slice(&res[..n]) {
Ok(Some(Packet::Connack(connack)))
if connack.code == ConnectReturnCode::Accepted =>
{
tracing::debug!(target: "mqtt", "Connection accepted");
}
_ => {
tracing::error!(target: "mqtt", "Connection rejected or bad CONNACK");
continue 'reconnect;
}
}
// SUBSCRIBE
let pid = Pid::new();
let subscribe_pkt = Packet::Subscribe(Subscribe {
pid,
topics: vec![SubscribeTopic {
topic_path: "test/topic".into(),
qos: QoS::AtMostOnce,
}],
});
let len = encode_slice(&subscribe_pkt, &mut buf).expect("Invalid SUBSCRIBE");
if mqtt_write.write_all(&buf[..len]).await.is_err() {
continue 'reconnect;
}
let n = mqtt_read.read(&mut res).await.unwrap_or(0);
if n == 0 {
continue 'reconnect;
}
match decode_slice(&res[..n]) {
Ok(Some(Packet::Suback(ack))) if ack.pid == pid => {
tracing::debug!(target: "mqtt", "Subscribed");
}
_ => {
tracing::error!(target: "mqtt", "SUBACK failed");
continue 'reconnect;
}
}
let ping_interval = Duration::from_secs(config.keep_alive as u64 / 2);
let mut ping_buf = [0u8; 1024];
let mut recv_buf = [0u8; 1024];
let mut publish_buf = [0u8; 1024];
let mut last_ping = tokio::time::Instant::now();
loop {
tokio::select! {
biased;
Some(cmd) = rx.recv() => {
match cmd {
MqttCommand::Publish { topic, payload } => {
let publish_pkt = Packet::Publish(Publish {
dup: false,
qospid: QosPid::AtMostOnce,
retain: false,
topic_name: &topic,
payload: &payload,
});
if let Ok(len) = encode_slice(&publish_pkt, &mut publish_buf) {
if mqtt_write.write_all(&publish_buf[..len]).await.is_err() {
tracing::error!(target: "mqtt", "Publish failed");
break;
} else {
tracing::debug!(target: "mqtt", "Published to topic");
}
}
}
}
}
_ = tokio::time::sleep_until(last_ping + ping_interval) => {
let len = encode_slice(&Packet::Pingreq, &mut ping_buf).expect("Invalid PINGREQ");
if mqtt_write.write_all(&ping_buf[..len]).await.is_err() {
tracing::error!(target: "mqtt", "PINGREQ failed");
break;
}
tracing::debug!(target: "mqtt", "Sent PINGREQ");
last_ping = tokio::time::Instant::now();
}
n = mqtt_read.read(&mut recv_buf) => {
let n = match n {
Ok(n) if n > 0 => n,
_ => break,
};
match decode_slice(&recv_buf[..n]) {
Ok(Some(pkt)) => match pkt {
Packet::Pingresp => {
tracing::debug!(target: "mqtt", "Received PINGRESP");
}
Packet::Publish(p) => {
tracing::debug!(target: "mqtt", "Received PUBLISH: topic = {}", p.topic_name);
}
_ => {}
},
_ => {
tracing::warn!(target: "mqtt", "Failed to decode incoming packet");
}
}
}
}
}
tracing::warn!(target: "mqtt", "Disconnected from broker");
let delay = Duration::from_secs((backoff * 2).min(60));
tracing::debug!(target: "mqtt", "Reconnecting in {} seconds...", delay.as_secs());
tokio::time::sleep(delay).await;
backoff *= 2;
}
});
}
}
Thanks for this, @Elsoberanold ! If possible, could you update the README with this information? That would really help us merge this as a PR, and it might benefit others as well.
Alternatively, you could create an /examples directory and add this as a .md file. That would be super helpful too.
That looks too big for the readme, it's be better in an examples folder. Would be great to make sure that it gets picked up by example scraping, and maybe see if it makes sense to manually link in in some reference docs.
Also, need to review that example to see if it's idiomatic and insightful. Didn't look at that yet, sorry.