Message Bus

Async Message Bus for Rust

Inspired by Actix

Basics

  1. Can deliver messages between actors using receivers (usually a queue implementations)
  2. Messages distincts and delivers by TypeId
  3. Messages delivers in a broadcast fashion to many receivers (Cloned)
  4. There are different kind of receivers implemented:
  5. Request/response api. There is an example is demoreqresp.rs

Here are the list of implmented handler kinds: ```rust

pub trait Handler: Send + Sync { type Error: crate::Error; type Response: Message;

fn handle(&self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
    Ok(())
}

}

[async_trait]

pub trait AsyncHandler: Send + Sync { type Error: crate::Error; type Response: Message;

async fn handle(&self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
    Ok(())
}

}

pub trait SynchronizedHandler: Send { type Error: crate::Error; type Response: Message;

fn handle(&mut self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
    Ok(())
}

}

[async_trait]

pub trait AsyncSynchronizedHandler: Send { type Error: crate::Error; type Response: Message;

async fn handle(&mut self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
    Ok(())
}

}

pub trait BatchHandler: Send + Sync { type Error: crate::Error; type Response: Message;

fn handle(&self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>;
fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
    Ok(())
}

}

[async_trait]

pub trait AsyncBatchHandler: Send + Sync { type Error: crate::Error; type Response: Message;

async fn handle(&self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>;
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
    Ok(())
}

}

pub trait BatchSynchronizedHandler: Send { type Error: crate::Error; type Response: Message;

fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>;
fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
    Ok(())
}

}

[async_trait]

pub trait AsyncBatchSynchronizedHandler: Send { type Error: crate::Error; type Response: Message;

async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>;
async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
    Ok(())
}

}

`` 6. Implemented handler kinds: 1. No Synchronization needed (Handler implementsSendandSync) * Not batched operations - sync (spawn_blocking) - async (spawn) * Batched - sync (spawn_blocking) - async (spawn) 2. Synchronization needed (Handler implements onlySendbut not implementsSync`) * Not batched operations - sync (spawnblocking) - async (spawn) * Batched - sync (spawnblocking) - async (spawn)

  1. Not yet implemented handler kinds:

    1. Synchronization needed and thread dedicated (Handler is !Sync and !Send)
      • Not batched operations
        • sync (spawnblocking)
        • async (spawn)
      • Batched
        • sync (spawnblocking)
        • async (spawn)
  2. Example: ```rust use messagebus::{error::Error, receivers, AsyncHandler, Bus}; use asynctrait::asynctrait;

struct TmpReceiver;

[async_trait]

impl AsyncHandler for TmpReceiver { type Error = Error; type Response = ();

async fn handle(&self, msg: i32, bus: &Bus) -> Result<Self::Response, Self::Error> {
    println!("---> i32 {}", msg);

    bus.send(2i64).await?;

    Ok(())
}

}

[async_trait]

impl AsyncHandler for TmpReceiver { type Error = Error; type Response = ();

async fn handle(&self, msg: i64, _bus: &Bus) -> Result<Self::Response, Self::Error> {
    println!("---> i64 {}", msg);

    Ok(())
}

}

[tokio::main]

async fn main() { let (b, poller) = Bus::build() .register(TmpReceiver) .subscribe::, _, _>(8, Default::default()) .subscribe::, _, _>(8, Default::default()) .done() .build();

b.send(1i32).await.unwrap();

println!("flush");
b.flush().await;

println!("close");
b.close().await;

println!("closed");

poller.await;
println!("[done]");

} ```