# `📟 MQRSTT` [![Crates.io](https://img.shields.io/crates/v/mqrstt.svg)](https://crates.io/crates/mqrstt) [![Docs](https://docs.rs/mqrstt/badge.svg)](https://docs.rs/mqrstt) [![dependency status](https://deps.rs/repo/github/GunnarMorrigan/mqrstt/status.svg)](https://deps.rs/repo/github/GunnarMorrigan/mqrstt) [![codecov](https://codecov.io/github/GunnarMorrigan/mqrstt/branch/main/graph/badge.svg)](https://app.codecov.io/gh/GunnarMorrigan/mqrstt) `MQRSTT` is an MQTTv5 client that provides sync and async (smol and tokio) implementation. Because this crate aims to be runtime agnostic the user is required to provide their own data stream. For an async approach the stream has to implement the smol or tokio [`AsyncReadExt`] and [`AsyncWriteExt`] traits. For a sync approach the stream has to implement the [`std::io::Read`] and [`std::io::Write`] traits.

Features

Examples

### Notes: - Your handler should not wait too long - Create a new connection when an error or disconnect is encountered - Handlers only get incoming packets

Smol example:

```rust use mqrstt::{ MqttClient, ConnectOptions, newsmol, packets::{self, Packet}, AsyncEventHandler, NetworkStatus, }; use asynctrait::async_trait; use bytes::Bytes; pub struct PingPong { pub client: MqttClient, }

[async_trait]

impl AsyncEventHandler for PingPong { // Handlers only get INCOMING packets. This can change later. async fn handle(&mut self, event: packets::Packet) -> () { match event { Packet::Publish(p) => { if let Ok(payload) = String::fromutf8(p.payload.tovec()) { if payload.tolowercase().contains("ping") { self.client .publish( p.qos, p.retain, p.topic.clone(), Bytes::fromstatic(b"pong"), ) .await .unwrap(); println!("Received Ping, Send pong!"); } } }, Packet::ConnAck() => { println!("Connected!") }, _ => (), } } } smol::blockon(async { let options = ConnectOptions::new("mqrsttSmolExample".tostring()); let (mut network, client) = newsmol(options); let stream = smol::net::TcpStream::connect(("broker.emqx.io", 1883)) .await .unwrap(); network.connect(stream).await.unwrap();

// This subscribe is only processed when we run the network
client.subscribe("mqrstt").await.unwrap();

let mut pingpong = PingPong {
    client: client.clone(),
};
let (n, t) = futures::join!(
    async {
        loop {
            return match network.poll(&mut pingpong).await {
                Ok(NetworkStatus::Active) => continue,
                otherwise => otherwise,
            };
        }
    },
    async {
        smol::Timer::after(std::time::Duration::from_secs(30)).await;
        client.disconnect().await.unwrap();
    }
);
assert!(n.is_ok());

}); ```

Tokio example:

```rust

use mqrstt::{ MqttClient, ConnectOptions, newtokio, packets::{self, Packet}, AsyncEventHandler, NetworkStatus, }; use tokio::time::Duration; use asynctrait::async_trait; use bytes::Bytes;

pub struct PingPong { pub client: MqttClient, }

[async_trait]

impl AsyncEventHandler for PingPong { // Handlers only get INCOMING packets. This can change later. async fn handle(&mut self, event: packets::Packet) -> () { match event { Packet::Publish(p) => { if let Ok(payload) = String::fromutf8(p.payload.tovec()) { if payload.tolowercase().contains("ping") { self.client .publish( p.qos, p.retain, p.topic.clone(), Bytes::fromstatic(b"pong"), ) .await .unwrap(); println!("Received Ping, Send pong!"); } } }, Packet::ConnAck(_) => { println!("Connected!") }, _ => (), } } }

[tokio::main]

async fn main() { let options = ConnectOptions::new("TokioTcpPingPongExample".to_string());

let (mut network, client) = new_tokio(options);

let stream = tokio::net::TcpStream::connect(("broker.emqx.io", 1883))
    .await
    .unwrap();

network.connect(stream).await.unwrap();

client.subscribe("mqrstt").await.unwrap();

let mut pingpong = PingPong {
    client: client.clone(),
};

let (n, _) = tokio::join!(
    async {
        loop {
            return match network.poll(&mut pingpong).await {
                Ok(NetworkStatus::Active) => continue,
                otherwise => otherwise,
            };
        }
    },
    async {
        tokio::time::sleep(Duration::from_secs(30)).await;
        client.disconnect().await.unwrap();
    }
);

}

```

Sync example:

```rust use mqrstt::{ MqttClient, ConnectOptions, new_sync, packets::{self, Packet}, EventHandler, NetworkStatus, }; use std::net::TcpStream; use bytes::Bytes;

pub struct PingPong { pub client: MqttClient, }

impl EventHandler for PingPong { // Handlers only get INCOMING packets. This can change later. fn handle(&mut self, event: packets::Packet) -> () { match event { Packet::Publish(p) => { if let Ok(payload) = String::fromutf8(p.payload.tovec()) { if payload.tolowercase().contains("ping") { self.client .publishblocking( p.qos, p.retain, p.topic.clone(), Bytes::fromstatic(b"pong"), ).unwrap(); println!("Received Ping, Send pong!"); } } }, Packet::ConnAck() => { println!("Connected!") }, _ => (), } } }

let mut clientid: String = "SyncTcpPingReqTestExample".tostring(); let options = ConnectOptions::new(client_id);

let address = "broker.emqx.io"; let port = 1883;

let (mut network, client) = new_sync(options);

// IMPORTANT: Set nonblocking to true! No progression will be made when stream reads block! let stream = TcpStream::connect((address, port)).unwrap(); stream.set_nonblocking(true).unwrap();

network.connect(stream).unwrap();

let mut pingpong = PingPong { client: client.clone(), }; let resjoinhandle = std::thread::spawn(move || loop { match network.poll(&mut pingpong) { Ok(NetworkStatus::Active) => continue, otherwise => return otherwise, } } );

std::thread::sleep(std::time::Duration::fromsecs(30)); client.disconnectblocking().unwrap(); let joinres = resjoinhandle.join(); assert!(joinres.isok()); let res = joinres.unwrap(); assert!(res.is_ok()); ```

FAQ

License

Licensed under

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, shall be licensed under MPL-2.0, without any additional terms or conditions.