acu

Utility crate for building asynchronous actors.

Before using this crate, I'd recommend to get to know of the actor pattern in Rust, Alice Ryhl created a very useful blog post.

Getting started

Add crate to dependencies

Using cargo-edit cargo add acu or manually...

Build your first Actor

```rust use tokio::sync::oneshot;

[derive(Debug)]

enum Message { Increment, Get { respond_to: oneshot::Sender }, }

impl acu::Message for Message {}

struct MyActor { receiver: acu::Receiver, counter: usize, }

impl MyActor { async fn run(&mut self) { while let Some(message) = self.receiver.recv().await { match message { Message::Increment => self.counter += 1, Message::Get { respondto } => respondto.send(self.counter).unwrap(), } } } }

[derive(Debug, Clone)]

struct MyActorHandle { sender: acu::Sender, }

impl MyActorHandle { pub fn new() -> Self { let (sender, receiver) = acu::channel(8, "MyActor"); let mut actor = MyActor { receiver, counter: 0, }; tokio::spawn(async move { actor.run().await }); Self { sender } }

pub async fn increment(&self) {
    self.sender.notify_with(|| Message::Increment).await
}

pub async fn get(&self) -> usize {
    self.sender
        .call_with(|respond_to| Message::Get { respond_to })
        .await
}

}

[tokio::main]

async fn main() { let handle = MyActorHandle::new(); println!("initial counter: {}", handle.get().await); for _ in 0..100 { handle.increment().await; } println!("counter after 100 increments: {}", handle.get().await); } ```

or if you would like to make use of logging functionality, you need to initialize log, for example by using simple-log crate: rust // at the top of the main function simple_log::quick!("debug");

Then each call/notify on the actor will get logged.

Master/slave pattern

You need to have master-slave feature enabled for the crate.

The decision you need to make, is whether the Actor Message implements Clone trait, if yes you can use BroadcasterMasterHandle which allows you to use directly actor methods; if no, you're stuck with MasterHandle on which you can't use actor methods.

Using BroadcasterMasterHandle(Message: Clone)

```rust use acu::BroadcasterMasterHandle; use acu::MasterExt; use tokio::sync::broadcast;

[derive(Debug, Clone, PartialEq, PartialOrd)]

enum Name { Master, MyActorA, MyActorB, }

impl acu::MasterName for Name { fn master_name() -> Self { Self::Master } }

impl AsRef for Name { fn as_ref(&self) -> &str { match self { Name::Master => "master", Name::MyActorA => "my-actor-a", Name::MyActorB => "my-actor-b", } } }

impl std::fmt::Display for Name { fn fmt(&self, f: &mut std::fmt::Formatter<'>) -> std::fmt::Result { let s: &str = self.asref(); f.write_str(s) } }

[derive(Debug, Clone)]

enum Message { Increment, Fetch { respond_to: broadcast::Sender, }, }

impl acu::Message for Message {}

struct MyActor { receiver: acu::Receiver, counter: usize, }

impl MyActor { async fn run(&mut self) { while let Some(message) = self.receiver.recv().await { match message { Message::Increment => self.counter += 1, Message::Fetch { respondto } => { respondto.send(self.counter).unwrap(); } } } } }

fn my_actor(name: Name) -> MyActorHandle { let (sender, receiver) = acu::channel(name); let mut actor = MyActor { receiver, counter: 0, }; tokio::spawn(async move { actor.run().await }); MyActorHandle { sender } }

type MyActorHandle = acu::Handle;

use asynctrait::asynctrait;

[async_trait]

trait MyActorExt { async fn increment(&self); async fn fetch(&self) -> Vec; }

[async_trait]

impl MyActorExt for MyActorHandle { async fn increment(&self) { self.sender.notify_with(|| Message::Increment).await }

async fn fetch(&self) -> Vec<usize> {
    self.sender
        .call_many_with(|respond_to| Message::Fetch { respond_to }, 8)
        .await
}

}

[tokio::main]

async fn main() { let handlea = myactor(Name::MyActorA); let handleb = myactor(Name::MyActorB); let master = { let master = BroadcasterMasterHandle::new(); master.push(handlea).await; master.push(handleb).await; master }; let getvalues = || async { let results = master.fetch().await; asserteq!(results.len(), 2); (results[0], results[1]) }; let printvalues = || async { let values = getvalues().await; println!("counter of MyActorA = {}", values.0); println!("counter of MyActorB = {}", values.1); println!(); }; for _ in 0..100 { master.increment().await; printvalues().await; } printvalues().await; { let actora = master.find(Name::MyActorA).await.unwrap(); for _ in 0..10 { actora.increment().await; } } print_values().await; } ```

Using MasterHandle(Message: ?Clone)

```rust use acu::MasterHandle; use acu::MasterExt; use tokio::sync::oneshot;

[derive(Debug, Clone, PartialEq, PartialOrd)]

enum Name { Master, MyActorA, MyActorB, }

impl acu::MasterName for Name { fn master_name() -> Self { Self::Master } }

impl AsRef for Name { fn as_ref(&self) -> &str { match self { Name::Master => "master", Name::MyActorA => "my-actor-a", Name::MyActorB => "my-actor-b", } } }

impl std::fmt::Display for Name { fn fmt(&self, f: &mut std::fmt::Formatter<'>) -> std::fmt::Result { let s: &str = self.asref(); f.write_str(s) } }

[derive(Debug)]

enum Message { Increment, Fetch { respond_to: oneshot::Sender, }, }

impl acu::Message for Message {}

struct MyActor { receiver: acu::Receiver, counter: usize, }

impl MyActor { async fn run(&mut self) { while let Some(message) = self.receiver.recv().await { match message { Message::Increment => self.counter += 1, Message::Fetch { respondto } => { respondto.send(self.counter).unwrap(); } } } } }

fn my_actor(name: Name) -> MyActorHandle { let (sender, receiver) = acu::channel(name); let mut actor = MyActor { receiver, counter: 0, }; tokio::spawn(async move { actor.run().await }); MyActorHandle { sender } }

type MyActorHandle = acu::Handle;

use asynctrait::asynctrait;

[async_trait]

trait MyActorExt { async fn increment(&self); async fn fetch(&self) -> usize; }

[async_trait]

impl MyActorExt for MyActorHandle { async fn increment(&self) { self.sender.notify_with(|| Message::Increment).await }

async fn fetch(&self) -> usize {
    self.sender
        .call_with(|respond_to| Message::Fetch { respond_to })
        .await
}

}

[tokio::main]

async fn main() { let handlea = myactor(Name::MyActorA); let handleb = myactor(Name::MyActorB); let master = { let master = MasterHandle::new(); master.push(handlea).await; master.push(handleb).await; master }; let gethandles = || async { let handlea = master.find(Name::MyActorA).await.unwrap(); let handleb = master.find(Name::MyActorA).await.unwrap(); (handlea, handleb) }; let getvalues = || async { let (handlea, handleb) = gethandles().await; (handlea.fetch().await, handleb.fetch().await) }; let printvalues = || async { let values = getvalues().await; println!("counter of MyActorA = {}", values.0); println!("counter of MyActorB = {}", values.1); println!(); }; for _ in 0..100 { let (handlea, handleb) = gethandles().await; handlea.increment().await; handleb.increment().await; printvalues().await; } printvalues().await; { let actora = master.find(Name::MyActorA).await.unwrap(); for _ in 0..10 { actora.increment().await; } } print_values().await; } ```

All examples can be found in examples/ directory.

Motivation

I wanted to use some structs and functions in few of my projects, including Houseflow. And I thought this might be useful for other projects as well.