Message Bus

Async Message Bus for Rust

Inspired by Actix

Basics

  1. Can deliver messages between actors using receivers (usually a queue implementations)
  2. Messages distincts and delivers by TypeId
  3. Messages delivers in a broadcast fashion to many receivers (Cloned)
  4. There are different kind of receivers implemented:

// Handler is Sync and we can spawn many of concurrent tasks pub trait Handler: Send + Sync { fn handle(&self, msg: M, bus: &Bus) -> anyhow::Result<()>; fn sync(&self, _bus: &Bus) -> anyhow::Result<()> {Ok(())} }

[async_trait]

pub trait AsyncHandler: Send + Sync { async fn handle(&self, msg: M, bus: &Bus) -> anyhow::Result<()>; async fn sync(&self, _bus: &Bus) -> anyhow::Result<()> {Ok(())} }

// Handler is not Sync and we cannot spawn many of concurrent tasks same time (uses synchronization primitives such as Mutex or RwLock) pub trait SynchronizedHandler: Send { fn handle(&mut self, msg: M, bus: &Bus) -> anyhow::Result<()>; fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {Ok(())} }

[async_trait]

pub trait AsyncSynchronizedHandler: Send { async fn handle(&mut self, msg: M, bus: &Bus) -> anyhow::Result<()>; async fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {Ok(())} }

// Handler is not Sync and handler will process items in batched mode pub trait BatchSynchronizedHandler: Send { fn handle(&mut self, msg: Vec, bus: &Bus) -> anyhow::Result<()>; fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {Ok(())} }

[async_trait]

pub trait AsyncBatchSynchronizedHandler: Send { async fn handle(&mut self, msg: Vec, bus: &Bus) -> anyhow::Result<()>; async fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {Ok(())} }

`` 4. Handler Kinds: 1. No Synchronization needed (Handler isSend+Sync) * Not batched operations **(implemented)** - sync (spawn_blocking) - async (spawn) * Batched - sync (spawn_blocking) - async (spawn) 2. Synchronization needed (Handler isSync+!Send) * Not batched operations **(implemented)** - sync (spawn_blocking) - async (spawn) * Batched **(implemented)** - sync (spawn_blocking) - async (spawn) 3. Synchronization needed and thread dedicated (Handler is!Sync+!Send`) * Not batched operations - sync (spawnblocking) - async (spawn) * Batched - sync (spawnblocking) - async (spawn)

  1. Example: ```rust use messagebus::{Bus, AsyncHandler, Result as MbusResult, receivers}; use asynctrait::asynctrait;

struct TmpReceiver;

[async_trait]

impl AsyncHandler for TmpReceiver { async fn handle(&self, msg: i32, bus: &Bus) -> MbusResult { println!("---> i32 {}", msg);

    bus.send(2i64).await?;

    Ok(())
}

}

[async_trait]

impl AsyncHandler for TmpReceiver { async fn handle(&self, msg: i64, _bus: &Bus) -> MbusResult { println!("---> i64 {}", msg);

    Ok(())
}

}

[tokio::main]

async fn main() { let (b, poller) = Bus::build() .register(TmpReceiver) .subscribe::>(Default::default()) .subscribe::>(Default::default()) .done() .build();

b.send(1i32).await.unwrap();
poller.await

} ```