Keep alive depends on actual communication
### Notes: - Your handler should not wait too long - Create a new connection when an error or disconnect is encountered - Handlers only get incoming packets
```rust use mqrstt::{ MqttClient, ConnectOptions, newsmol, packets::{self, Packet}, AsyncEventHandler, NetworkStatus, }; use asynctrait::async_trait; use bytes::Bytes; pub struct PingPong { pub client: MqttClient, }
impl AsyncEventHandler for PingPong { // Handlers only get INCOMING packets. This can change later. async fn handle(&mut self, event: packets::Packet) -> () { match event { Packet::Publish(p) => { if let Ok(payload) = String::fromutf8(p.payload.tovec()) { if payload.tolowercase().contains("ping") { self.client .publish( p.qos, p.retain, p.topic.clone(), Bytes::fromstatic(b"pong"), ) .await .unwrap(); println!("Received Ping, Send pong!"); } } }, Packet::ConnAck() => { println!("Connected!") }, _ => (), } } } smol::blockon(async { let options = ConnectOptions::new("mqrsttSmolExample".tostring()); let (mut network, client) = newsmol(options); let stream = smol::net::TcpStream::connect(("broker.emqx.io", 1883)) .await .unwrap(); network.connect(stream).await.unwrap();
// This subscribe is only processed when we run the network
client.subscribe("mqrstt").await.unwrap();
let mut pingpong = PingPong {
client: client.clone(),
};
let (n, t) = futures::join!(
async {
loop {
return match network.poll(&mut pingpong).await {
Ok(NetworkStatus::Active) => continue,
otherwise => otherwise,
};
}
},
async {
smol::Timer::after(std::time::Duration::from_secs(30)).await;
client.disconnect().await.unwrap();
}
);
assert!(n.is_ok());
}); ```
```rust
use mqrstt::{ MqttClient, ConnectOptions, newtokio, packets::{self, Packet}, AsyncEventHandler, NetworkStatus, }; use tokio::time::Duration; use asynctrait::async_trait; use bytes::Bytes;
pub struct PingPong { pub client: MqttClient, }
impl AsyncEventHandler for PingPong { // Handlers only get INCOMING packets. This can change later. async fn handle(&mut self, event: packets::Packet) -> () { match event { Packet::Publish(p) => { if let Ok(payload) = String::fromutf8(p.payload.tovec()) { if payload.tolowercase().contains("ping") { self.client .publish( p.qos, p.retain, p.topic.clone(), Bytes::fromstatic(b"pong"), ) .await .unwrap(); println!("Received Ping, Send pong!"); } } }, Packet::ConnAck(_) => { println!("Connected!") }, _ => (), } } }
async fn main() { let options = ConnectOptions::new("TokioTcpPingPongExample".to_string());
let (mut network, client) = new_tokio(options);
let stream = tokio::net::TcpStream::connect(("broker.emqx.io", 1883))
.await
.unwrap();
network.connect(stream).await.unwrap();
client.subscribe("mqrstt").await.unwrap();
let mut pingpong = PingPong {
client: client.clone(),
};
let (n, _) = tokio::join!(
async {
loop {
return match network.poll(&mut pingpong).await {
Ok(NetworkStatus::Active) => continue,
otherwise => otherwise,
};
}
},
async {
tokio::time::sleep(Duration::from_secs(30)).await;
client.disconnect().await.unwrap();
}
);
}
```
Why are there no implementations for TLS connections?
Many examples of creating TLS streams in rust exist with the crates async-rustls
and tokio-rustls
. The focus of this crate is MQTTv5
and providing a runtime free choice.
rumqttc
?rumqttc
packet id collision errors (It is not possible with MQRSTT
).Please ask :)
With the smol runtime you can create very small binaries. A simple PingPong smol TCP client can be had for 550\~KB and with TLS you are looking at 1.5\~ MB using the following flags. This makes mqrstt
extremely usefull for embedded devices! :)
[profile.release]
opt-level = "z" # Optimize for size.
lto = true
codegen-units = 1
strip = true
Licensed under
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, shall be licensed under MPL-2.0, without any additional terms or conditions.