Keep alive depends on actual communication
### Notes: - Your handler should not wait too long - Create a new connection when an error or disconnect is encountered - Handlers only get incoming packets
```rust use mqrstt::{ MqttClient, ConnectOptions, newsmol, packets::{self, Packet}, AsyncEventHandler, NetworkStatus, }; use asynctrait::async_trait; use bytes::Bytes; pub struct PingPong { pub client: MqttClient, }
impl AsyncEventHandler for PingPong { // Handlers only get INCOMING packets. This can change later. async fn handle(&mut self, event: packets::Packet) -> () { match event { Packet::Publish(p) => { if let Ok(payload) = String::fromutf8(p.payload.tovec()) { if payload.tolowercase().contains("ping") { self.client .publish( p.qos, p.retain, p.topic.clone(), Bytes::fromstatic(b"pong"), ) .await .unwrap(); println!("Received Ping, Send pong!"); } } }, Packet::ConnAck() => { println!("Connected!") }, _ => (), } } } smol::blockon(async { let options = ConnectOptions::new("mqrsttSmolExample".tostring()); let (mut network, client) = newsmol(options); let stream = smol::net::TcpStream::connect(("broker.emqx.io", 1883)) .await .unwrap(); network.connect(stream).await.unwrap();
// This subscribe is only processed when we run the network
client.subscribe("mqrstt").await.unwrap();
let mut pingpong = PingPong {
client: client.clone(),
};
let (n, t) = futures::join!(
async {
loop {
return match network.poll(&mut pingpong).await {
Ok(NetworkStatus::Active) => continue,
otherwise => otherwise,
};
}
},
async {
smol::Timer::after(std::time::Duration::from_secs(30)).await;
client.disconnect().await.unwrap();
}
);
assert!(n.is_ok());
}); ```
```rust
use mqrstt::{ MqttClient, ConnectOptions, newtokio, packets::{self, Packet}, AsyncEventHandler, NetworkStatus, }; use tokio::time::Duration; use asynctrait::async_trait; use bytes::Bytes;
pub struct PingPong { pub client: MqttClient, }
impl AsyncEventHandler for PingPong { // Handlers only get INCOMING packets. This can change later. async fn handle(&mut self, event: packets::Packet) -> () { match event { Packet::Publish(p) => { if let Ok(payload) = String::fromutf8(p.payload.tovec()) { if payload.tolowercase().contains("ping") { self.client .publish( p.qos, p.retain, p.topic.clone(), Bytes::fromstatic(b"pong"), ) .await .unwrap(); println!("Received Ping, Send pong!"); } } }, Packet::ConnAck(_) => { println!("Connected!") }, _ => (), } } }
async fn main() { let options = ConnectOptions::new("TokioTcpPingPongExample".to_string());
let (mut network, client) = new_tokio(options);
let stream = tokio::net::TcpStream::connect(("broker.emqx.io", 1883))
.await
.unwrap();
network.connect(stream).await.unwrap();
client.subscribe("mqrstt").await.unwrap();
let mut pingpong = PingPong {
client: client.clone(),
};
let (n, _) = tokio::join!(
async {
loop {
return match network.poll(&mut pingpong).await {
Ok(NetworkStatus::Active) => continue,
otherwise => otherwise,
};
}
},
async {
tokio::time::sleep(Duration::from_secs(30)).await;
client.disconnect().await.unwrap();
}
);
}
```
```rust use mqrstt::{ MqttClient, ConnectOptions, new_sync, packets::{self, Packet}, EventHandler, NetworkStatus, }; use std::net::TcpStream; use bytes::Bytes;
pub struct PingPong { pub client: MqttClient, }
impl EventHandler for PingPong { // Handlers only get INCOMING packets. This can change later. fn handle(&mut self, event: packets::Packet) -> () { match event { Packet::Publish(p) => { if let Ok(payload) = String::fromutf8(p.payload.tovec()) { if payload.tolowercase().contains("ping") { self.client .publishblocking( p.qos, p.retain, p.topic.clone(), Bytes::fromstatic(b"pong"), ).unwrap(); println!("Received Ping, Send pong!"); } } }, Packet::ConnAck() => { println!("Connected!") }, _ => (), } } }
let mut clientid: String = "SyncTcpPingReqTestExample".tostring(); let options = ConnectOptions::new(client_id);
let address = "broker.emqx.io"; let port = 1883;
let (mut network, client) = new_sync(options);
// IMPORTANT: Set nonblocking to true! No progression will be made when stream reads block! let stream = TcpStream::connect((address, port)).unwrap(); stream.set_nonblocking(true).unwrap();
network.connect(stream).unwrap();
let mut pingpong = PingPong { client: client.clone(), }; let resjoinhandle = std::thread::spawn(move || loop { match network.poll(&mut pingpong) { Ok(NetworkStatus::Active) => continue, otherwise => return otherwise, } } );
std::thread::sleep(std::time::Duration::fromsecs(30)); client.disconnectblocking().unwrap(); let joinres = resjoinhandle.join(); assert!(joinres.isok()); let res = joinres.unwrap(); assert!(res.is_ok()); ```
Why are there no implementations for TLS connections?
Many examples of creating TLS streams in rust exist with the crates async-rustls
and tokio-rustls
. The focus of this crate is MQTTv5
and providing a runtime free choice.
rumqttc
?rumqttc
packet id collision errors (It is not possible with MQRSTT
).Please ask :)
Licensed under
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, shall be licensed under MPL-2.0, without any additional terms or conditions.