A simple unbounded multi-producer multi-subscriber event bus built with crossbeam channels.
Unlike the the Bus
from the bus
crate, double_decker::Bus
is unbounded and everyone knows that double-decker buses
carry more passengers than a regular bus 🤷♂️.
Unlike bus::Bus
, double_decker::Bus
implements a cheap Clone()
which I've found useful.
You should probably just use the bus
crate because it's mature and
completely lock-free.
T
must implement Clone
so it can be passed to all consumers.
When you call add_rx()
, a Sender<T>
/Receiver<T>
pair are created and the Sender
is
stored in a HashMap
behind a RwLock
.
broadcast()
uses shared read access of the RwLock
and sends out events to each Receiver
in the
order they were added.
Lock contention can only occur when the number of subscribers changes as this requires write access to
the RwLock
. This occurs when you call add_rx()
or when you call broadcast()
and one of more
of the Sender
s returns SendError
because it's disconnected.
bus
crateSingle-send, multi-consumer example
```rust use double_decker::Bus;
let mut bus = Bus::new(); let mut rx1 = bus.addrx(); let mut rx2 = bus.addrx();
bus.broadcast("Hello"); asserteq!(rx1.recv(), Ok("Hello")); asserteq!(rx2.recv(), Ok("Hello")); ```
Multi-send, multi-consumer example
```rust use double_decker::Bus; use std::thread;
let mut bus = Bus::new(); let mut rx1 = bus.addrx(); let mut rx2 = bus.addrx();
// start a thread that sends 1..100 let j = thread::spawn(move || { for i in 1..100 { bus.broadcast(i); } });
// every value should be received by both receivers for i in 1..100 { // rx1 asserteq!(rx1.recv(), Ok(i)); // and rx2 asserteq!(rx2.recv(), Ok(i)); }
j.join().unwrap(); ```
Also included are subscribe
and subscribe_on_thread
which allow you to subscribe to broadcast
events with a closure that is called on every broadcast. subscribe
is blocking whereas
subscribe_on_thread
calls the closure from another thread.
subscribe_on_thread
returns a Subscription
which you should hang on to as the thread terminates
when this is dropped.
```rust use double_decker::{Bus, SubscribeOnThread};
let bus = Bus::
// This would block // bus.subscribe(Box::new(move |_event| { // // This closure is called on every broadcast // }));
let subscription = bus.subscribeonthread(Box::new(move |event| { // This closure is called on every broadcast }));
bus.broadcast(5); ```
License: MIT