The actix-mqtt-client
crate is a mqtt v3.1.1 client based on the actix framework
First, create 2 actix actors, one for receiving publish messages, the other one for receiving error messages from the client, you can also create an optional actix actor for receiving the stop message: ```rust pub struct ErrorActor;
impl actix::Actor for ErrorActor {
type Context = actix::Context
impl actix::Handler
pub struct MessageActor;
impl actix::Actor for MessageActor {
type Context = actix::Context
impl actix::Handler
Then, connect to the server(using tokio) and use the read and write part of the stream along with the actors to create a MqttClient: ```rust use std::io::Error as IoError; use std::net::SocketAddr; use std::str::FromStr; use std::time::Duration; use actix::{Actor, Arbiter, System}; use envlogger; use tokio::io::split; use tokio::net::TcpStream; use tokio::time::{delayuntil, Instant}; use actixmqttclient::client::{MqttClient, MqttOptions};
System::run(|| { let socketaddr = SocketAddr::fromstr("127.0.0.1:1883").unwrap(); let future = async move { let result = async move { let stream = TcpStream::connect(socketaddr).await?; let (r, w) = split(stream); log::info!("TCP connected"); let mut client = MqttClient::new( r, w, String::from("test"), MqttOptions::default(), MessageActor.start().recipient(), ErrorActor.start().recipient(), None, ); client.connect().await?; log::info!("MQTT connected"); log::info!("Subscribe"); client .subscribe(String::from("test"), mqtt::QualityOfService::Level2) .await?; log::info!("Publish"); client .publish( String::from("test"), mqtt::QualityOfService::Level0, Vec::from("test".asbytes()), ) .await?; log::info!("Wait for 10s"); let delaytime = Instant::now() + Duration::new(10, 0); delayuntil(delaytime).await; client .publish( String::from("test"), mqtt::QualityOfService::Level1, Vec::from("test2".asbytes()), ) .await?; log::info!("Wait for 10s"); let delaytime = Instant::now() + Duration::new(10, 0); delayuntil(delaytime).await; client .publish( String::from("test"), mqtt::QualityOfService::Level2, Vec::from("test3".asbytes()), ) .await?; log::info!("Wait for 10s"); let delaytime = Instant::now() + Duration::new(10, 0); delayuntil(delaytime).await; log::info!("Disconnect"); client.disconnect(false).await?; log::info!("Check if disconnect is successful"); Ok(asserteq!(true, client.is_disconnected())) as Result<(), IoError> } .await; result.unwrap() }; Arbiter::spawn(future); }) .unwrap(); ```