fast speed thread safe async execute queue

Latest Version Rust Documentation Rust Report Card Rust CI

Example RwModel

```rust use aqueue::model::RwModel; use asynctrait::asynctrait; use std::sync::Arc; use std::time::Instant; use tokio::try_join;

[derive(Default)]

struct Foo { count: u64, i: i128, }

impl Foo { pub fn add(&mut self, x: i32) -> i128 { self.count += 1; self.i += x as i128; self.i } fn reset(&mut self) { self.count = 0; self.i = 0; } pub fn get(&self) -> i128 { self.i } pub fn get_count(&self) -> u64 { self.count } }

[async_trait]

pub trait FooRunner { async fn add(&self, x: i32) -> i128; async fn reset(&self); async fn get(&self) -> i128; async fn get_count(&self) -> u64; }

[async_trait]

impl FooRunner for RwModel { async fn add(&self, x: i32) -> i128 { self.callmut(|inner| async move { inner.add(x) }).await } async fn reset(&self) { self.callmut(|inner| async move { inner.reset() }).await } async fn get(&self) -> i128 { self.call(|inner| async move { inner.get() }).await } async fn getcount(&self) -> u64 { self.call(|inner| async move { inner.getcount() }).await } }

[tokio::main]

async fn main() -> anyhow::Result<()> { { // Single thread test let tf = RwModel::new(Foo::default()); tf.add(100).await; asserteq!(100, tf.get().await); tf.add(-100).await; asserteq!(0, tf.get().await); tf.reset().await;

    let start = Instant::now();
    for i in 0..100000000 {
        tf.add(i).await;
    }

    println!(
        "test rw a count:{} value:{} time:{} qps:{}",
        tf.get_count().await,
        tf.get().await,
        start.elapsed().as_secs_f32(),
        tf.get_count().await / start.elapsed().as_millis() as u64 * 1000
    );
}

{
    //Multithreading test
    let tf = Arc::new(RwModel::new(Foo::default()));
    let start = Instant::now();
    let a_tf = tf.clone();
    let a = tokio::spawn(async move {
        for i in 0..25000000 {
            a_tf.add(i).await;
        }
    });

    let b_tf = tf.clone();
    let b = tokio::spawn(async move {
        for i in 25000000..50000000 {
            b_tf.add(i).await;
        }
    });

    let c_tf = tf.clone();
    let c = tokio::spawn(async move {
        for i in 50000000..75000000 {
            c_tf.add(i).await;
        }
    });

    let d_tf = tf.clone();
    let d = tokio::spawn(async move {
        for i in 75000000..100000000 {
            d_tf.add(i).await;
        }
    });

    try_join!(a, b, c, d)?;

    println!(
        "test rw b count:{} value:{} time:{} qps:{}",
        tf.get_count().await,
        tf.get().await,
        start.elapsed().as_secs_f32(),
        tf.get_count().await / start.elapsed().as_millis() as u64 * 1000
    );
}

Ok(())

}

shell test rw a count:100000000 value:4999999950000000 time:5.1791396 qps:19308000 test rw b count:100000000 value:4999999950000000 time:5.293417 qps:18892000 ```

Example Actor

```rust use aqueue::Actor; use asynctrait::asynctrait; use std::sync::Arc; use std::time::Instant; use tokio::try_join;

[derive(Default)]

struct Foo { count: u64, i: i128, }

impl Foo { pub fn add(&mut self, x: i32) -> i128 { self.count += 1; self.i += x as i128; self.i } fn reset(&mut self) { self.count = 0; self.i = 0; } pub fn get(&self) -> i128 { self.i } pub fn get_count(&self) -> u64 { self.count } }

[async_trait]

pub trait FooRunner { async fn add(&self, x: i32) -> i128; async fn reset(&self); async fn get(&self) -> i128; async fn get_count(&self) -> u64; }

[async_trait]

impl FooRunner for Actor { async fn add(&self, x: i32) -> i128 { self.innercall(|inner| async move { inner.getmut().add(x) }).await } async fn reset(&self) { self.innercall(|inner| async move { inner.getmut().reset() }).await } async fn get(&self) -> i128 { self.innercall(|inner| async move { inner.getmut().get() }).await } async fn getcount(&self) -> u64 { self.innercall(|inner| async move { inner.getmut().getcount() }).await } }

[tokio::main]

async fn main() -> anyhow::Result<()> { { // Single thread test let tf = Arc::new(Actor::new(Foo::default())); tf.add(100).await; asserteq!(100, tf.get().await); tf.add(-100).await; asserteq!(0, tf.get().await); tf.reset().await;

    let start = Instant::now();
    for i in 0..2000000 {
        tf.add(i).await;
    }

    println!(
        "test a count:{} value:{} time:{} qps:{}",
        tf.get_count().await,
        tf.get().await,
        start.elapsed().as_secs_f32(),
        tf.get_count().await / start.elapsed().as_millis() as u64 * 1000
    );
}

{
    //Multithreading test
    let tf = Arc::new(Actor::new(Foo::default()));
    let start = Instant::now();
    let a_tf = tf.clone();
    let a = tokio::spawn(async move {
        for i in 0..1000000 {
            a_tf.add(i).await;
        }
    });

    let b_tf = tf.clone();
    let b = tokio::spawn(async move {
        for i in 1000000..2000000 {
            b_tf.add(i).await;
        }
    });

    let c_tf = tf.clone();
    let c = tokio::spawn(async move {
        for i in 2000000..3000000 {
            c_tf.add(i).await;
        }
    });

    try_join!(a, b, c)?;

    println!(
        "test b count:{} value:{} time:{} qps:{}",
        tf.get_count().await,
        tf.get().await,
        start.elapsed().as_secs_f32(),
        tf.get_count().await / start.elapsed().as_millis() as u64 * 1000
    );
}

Ok(())

} shell test a count:2000000 value:1999999000000 time:0.098685 qps:20408000 test b count:3000000 value:4499998500000 time:0.1486727 qps:20270000 ```

Example Database

(use Actor Trait and Sqlx Sqlite)

```rust use anyhow::{anyhow, Result}; use aqueue::Actor; use asynctrait::asynctrait; use sqlx::sqlite::SqlitePoolOptions; use sqlx::SqlitePool; use std::env; use tokio::task::JoinHandle;

[derive(sqlx::FromRow, Debug)]

[allow(dead_code)]

pub struct User { id: i64, name: String, gold: f64, }

pub struct DataBases { auto_id: u32, pool: SqlitePool, }

unsafe impl Send for DataBases {} unsafe impl Sync for DataBases {}

impl DataBases { pub fn new(sqlitemaxconnections: u32) -> Result> { let pool = SqlitePoolOptions::new() .maxconnections(sqlitemaxconnections) .connectlazy(&env::var("DATABASE_URL")?)?;

    Ok(Actor::new(DataBases { auto_id: 0, pool }))
}
/// create user table from table.sql
async fn create_table(&self) -> Result<()> {
    sqlx::query(include_str!("table.sql")).execute(&self.pool).await?;
    Ok(())
}
/// insert user data
async fn insert_user(&mut self, name: &str, gold: f64) -> Result<bool> {
    // println!("insert {} name:{} gold:{}",self.auto_id,name,gold);
    self.auto_id += 1;
    let row = sqlx::query(
        r#"
        insert into `user`(`id`,`name`,`gold`)
        values(?,?,?)
     "#,
    )
        .bind(&self.auto_id)
        .bind(name)
        .bind(gold)
        .execute(&self.pool)
        .await?
        .rows_affected();

    Ok(row == 1)
}
/// insert user data
async fn select_all_users(&self) -> Result<Vec<User>> {
    Ok(sqlx::query_as::<_, User>("select * from `user`").fetch_all(&self.pool).await?)
}

}

[async_trait]

pub trait IDatabase { /// create user table from table.sql async fn createtable(&self) -> Result<()>; /// insert user data async fn insertuser(&self, name: String, gold: f64) -> Result; /// insert user data async fn insertuserrefname(&self, name: &str, gold: f64) -> Result; /// select all users table async fn selectall_users(&self) -> Result>; }

[async_trait]

impl IDatabase for Actor { async fn createtable(&self) -> Result<()> { self.innercall(|inner| async move { inner.get().createtable().await }).await } async fn insertuser(&self, name: String, gold: f64) -> Result { self.innercall(|inner| async move { inner.getmut().insertuser(&name, gold).await }) .await } async fn insertuserrefname(&self, name: &str, gold: f64) -> Result { self.innercall(|inner| async move { inner.getmut().insert_user(name, gold).await }) .await }

async fn select_all_users(&self) -> Result<Vec<User>> {
    unsafe {
        // warn:
        // This is a thread unsafe way to get
        // When using, please make sure there is no thread safety problem
        self.deref_inner().select_all_users().await
    }
}

}

lazystatic::lazystatic! { /// default global static database actor obj static ref DB:Actor={ DataBases::new(50).expect("install db error") }; }

[tokio::main]

async fn main() -> Result<()> { dotenv::dotenv().ok().okorelse(|| anyhow!(".env file not found"))?; DB.createtable().await?; let mut joinvec = Vec::withcapacity(100); // create 100 tokio task run it. for i in 0..100 { let join: JoinHandleuser(i.to_string(), j as f64).await?; } Ok(()) });

    join_vec.push(join);
}
//wait all task finish
for join in join_vec {
    join.await??;
}
// print all users
for user in DB.select_all_users().await? {
    println!("{:?}", user);
}
Ok(())

} ```

shell User { id: 1, name: "0", gold: 0.0 } User { id: 2, name: "0", gold: 0.0 } User { id: 3, name: "0", gold: 0.0 } User { id: 4, name: "10", gold: 0.0 } User { id: 5, name: "10", gold: 0.0 } User { id: 6, name: "16", gold: 0.0 } User { id: 7, name: "10", gold: 0.0 } ... User { id: 99996, name: "2", gold: 999.0 } User { id: 99997, name: "8", gold: 999.0 } User { id: 99998, name: "5", gold: 999.0 } User { id: 99999, name: "9", gold: 999.0 } User { id: 100000, name: "10", gold: 999.0 }

Example Basic

```rust use aqueue::AQueue; static mut VALUE:i32=0;

[tokio::main]

async fn main()->Result<(),Box> { let queue = AQueue::new(); let mut v=0i32; for i in 0..2000000 { v= queue.run(|x| async move { unsafe { // thread safe execute VALUE += x; VALUE } }, i).await; }

assert_eq!(v,-1455759936);
Ok(())

}

```

Example not used trait actor

```rust use aqueue::{AResult,AQueue}; use std::sync::Arc; use std::cell::{RefCell}; use std::error::Error; use std::time::Instant; use tokio::try_join;

struct Foo{ count:u64, i:i128 }

impl Foo{ pub fn add(&mut self,x:i32)->i128{ self.count+=1; self.i+=x as i128; self.i }

pub fn get(&self)->i128{
    self.i
}
pub fn get_count(&self)->u64{
    self.count
}

}

struct Store(RefCell); unsafe impl Sync for Store{} unsafe impl Send for Store{}

impl Store{ pub fn new(x:T)->Store{ Store(RefCell::new(x)) } }

struct FooRunner { inner:Arc>, queue:AQueue }

impl FooRunner { pub fn new()-> FooRunner { FooRunner { inner:Arc::new(Store::new(Foo{ count:0, i:0})), queue:AQueue::new() } } pub async fn add(&self,x:i32)->i128{ self.queue.run(|inner| async move { inner.0.borrow_mut().add(x) },self.inner.clone()).await }

pub async fn get(&self)->i128{
    self.queue.run(|inner| async move  {
        inner.0.borrow().get()
    },self.inner.clone()).await
}

pub async fn get_count(&self)->u64{
    self.queue.run(|inner| async move {
        inner.0.borrow().get_count()
    },self.inner.clone()).await
}

}

[tokio::main]

async fn main()->anyhow::Result<()> { { // Single thread test let tf = Arc::new(FooRunner::new()); tf.add(100).await?; asserteq!(100, tf.get().await?); tf.add(-100).await.unwrap(); asserteq!(0, tf.get().await?);

    let start = Instant::now();
    for i in 0..2000000 {
        tf.add(i);
    }

    println!("test a count:{} value:{} time:{} qps:{}",
             tf.get_count().await,
             tf.get().await,
             start.elapsed().as_secs_f32(),
             tf.get_count().await / start.elapsed().as_millis() as u64 * 1000);
}

{
    //Multithreading test
    let tf = Arc::new(FooRunner::new());
    let start = Instant::now();
    let a_tf = tf.clone();
    let a = tokio::spawn(async move {
        for i in 0..1000000 {
             a_tf.add(i);
        }
    });

    let b_tf = tf.clone();
    let b = tokio::spawn(async move {
        for i in 1000000..2000000 {
            b_tf.add(i);
        }
    });

    let c_tf = tf.clone();
    let c = tokio::spawn(async move {
        for i in 2000000..3000000 {
             c_tf.add(i).await;
        }
    });
    try_join!(a,b,c)?;
    println!("test b count:{} value:{} time:{} qps:{}",
             tf.get_count().await,
             tf.get().await,
             start.elapsed().as_secs_f32(),
             tf.get_count().await / start.elapsed().as_millis() as u64 * 1000);       
}

Ok(())

} ```