```rust
use sqlx::{SqlitePool}; use std::sync::Arc; use aqueue::Actor; use sqlx::sqlite::SqlitePoolOptions; use anyhow::*; use asynctrait ::asynctrait; use std::env; use tokio::task::JoinHandle;
pub struct User { id: i64, name: String, gold:f64 }
pub struct DataBases{
autoid:u32,
pool:SqlitePool
}
unsafe impl Send for DataBases{}
unsafe impl Sync for DataBases{}
impl DataBases{
pub fn new(sqlitemaxconnections:u32)->anyhow::Result
async fn insert_user(&mut self,name:&str,gold:f64)->Result<bool> {
self.auto_id+=1;
let row = sqlx::query(r#"
insert into `user`(`id`,`name`,`gold`)
values(?,?,?)
"#).bind(&self.auto_id)
.bind(name)
.bind(gold)
.execute(&self.pool)
.await?
.last_insert_rowid();
Ok(row == 1)
}
async fn select_all_users(&self)->Result<Vec<User>>{
Ok(sqlx::query_as::<_,User>("select * from `user`").fetch_all(&self.pool).await?)
}
}
pub trait IDatabase{
async fn createtable(&self)->Result<()>;
async fn insertuser(&self,user:String,gold:f64)->Result
impl IDatabase for Actor
}
async fn select_all_users(&self) -> Result<Vec<User>> {
unsafe{
self.deref_inner().select_all_users().await
}
}
}
async fn main()->Result<()> {
dotenv::dotenv().ok().okorelse(||anyhow!(".env file not found"))?;
let db= DataBases::new(10)?;
db.createtable().await?;
let mut joinvec=Vec::withcapacity(100);
for i in 0..100 {
let innerdb = db.clone();
let join:JoinHandle
for join in joinvec {
join.await??;
}
for user in db.selectall_users().await? {
println!("{:?}",user);
}
Ok(())
}
```
shell
cargo run --color=always --package aqueue --example test_sqlx --release
Finished release [optimized] target(s) in 0.21s
Running `target\release\examples\test_sqlx.exe`
User { id: 1, name: "0", gold: 0.0 }
User { id: 2, name: "1", gold: 0.0 }
User { id: 3, name: "2", gold: 0.0 }
User { id: 4, name: "3", gold: 0.0 }
User { id: 5, name: "4", gold: 0.0 }
User { id: 6, name: "5", gold: 0.0 }
...
User { id: 9996, name: "0", gold: 95.0 }
User { id: 9997, name: "0", gold: 96.0 }
User { id: 9998, name: "0", gold: 97.0 }
User { id: 9999, name: "0", gold: 98.0 }
User { id: 10000, name: "0", gold: 99.0 }
Process finished with exit code 0
```rust use aqueue::AQueue; static mut VALUE:i32=0;
async fn main()->Result<(),Box
assert_eq!(v,-1455759936);
}
```
```rust
use aqueue::{AResult,AQueue}; use std::sync::Arc; use std::cell::{RefCell}; use std::error::Error; use std::time::Instant; use anyhow::*;
struct Foo{ count:u64, i:i128 }
impl Foo{ pub fn add(&mut self,x:i32)->i128{ self.count+=1; self.i+=x as i128; self.i }
pub fn get(&self)->i128{
self.i
}
pub fn get_count(&self)->u64{
self.count
}
}
struct Store
impl
struct FooRunner {
inner:Arc
impl FooRunner {
pub fn new()-> FooRunner {
FooRunner {
inner:Arc::new(Store::new(Foo{ count:0, i:0})),
queue:AQueue::new()
}
}
pub async fn add(&self,x:i32)->Result
pub async fn get(&self)->Result<i128>{
self.queue.run(async move |inner| {
Ok(inner.0.borrow().get())
},self.inner.clone()).await
}
pub async fn get_count(&self)->Result<u64>{
self.queue.run(async move |inner| {
Ok(inner.0.borrow().get_count())
},self.inner.clone()).await
}
}
async fn main()->Result<()> { { // Single thread test let tf = Arc::new(FooRunner::new()); tf.add(100).await?; asserteq!(100, tf.get().await?); tf.add(-100).await.unwrap(); asserteq!(0, tf.get().await?);
let start = Instant::now();
for i in 0..2000000 {
if let Err(er) = tf.add(i).await {
println!("{}", er);
};
}
println!("test a count:{} value:{} time:{} qps:{}",
tf.get_count().await?,
tf.get().await?,
start.elapsed().as_secs_f32(),
tf.get_count().await? / start.elapsed().as_millis() as u64 * 1000);
}
{
//Multithreading test
let tf = Arc::new(FooRunner::new());
let start = Instant::now();
let a_tf = tf.clone();
let a = tokio::spawn(async move {
for i in 0..1000000 {
if let Err(er) = a_tf.add(i).await {
println!("{}", er);
};
}
});
let b_tf = tf.clone();
let b = tokio::spawn(async move {
for i in 1000000..2000000 {
if let Err(er) = b_tf.add(i).await {
println!("{}", er);
};
}
});
let c_tf = tf.clone();
let c = tokio::spawn(async move {
for i in 2000000..3000000 {
if let Err(er) = c_tf.add(i).await {
println!("{}", er);
};
}
});
c.await?;
a.await?;
b.await?;
println!("test b count:{} value:{} time:{} qps:{}",
tf.get_count().await?,
tf.get().await?,
start.elapsed().as_secs_f32(),
tf.get_count().await? / start.elapsed().as_millis() as u64 * 1000);
}
Ok(())
} ```
```rust
use aqueue::{Actor,AResult}; use std::sync::Arc; use std::error::Error; use std::time::Instant; use asynctrait::asynctrait; use anyhow::*;
struct Foo{ count:u64, i:i128 }
impl Foo{
pub fn add(&mut self,x:i32)->i128{
self.count+=1;
self.i+=x as i128;
self.i
}
fn reset(&mut self){
self.count=0;
self.i=0;
}
pub fn get(&self)->i128{
self.i
}
pub fn get_count(&self)->u64{
self.count
}
}
pub trait FooRunner{
async fn add(&self,x:i32)->Result
impl FooRunner for Actor
async fn add(&self,x:i32)->Result
async fn reset(&self)->Result<()>{
self.innercall(async move |inner| {
Ok(inner.getmut().reset())
}).await
}
async fn get(&self)->Result
async fn getcount(&self)->Result
async fn main()->Result<()> { { // Single thread test let tf = Arc::new(Actor::new(Foo::default())); tf.add(100).await?; asserteq!(100, tf.get().await?); tf.add(-100).await.unwrap(); asserteq!(0, tf.get().await?); tf.reset().await?;
let start = Instant::now();
for i in 0..2000000 {
if let Err(er) = tf.add(i).await {
println!("{}", er);
};
}
println!("test a count:{} value:{} time:{} qps:{}",
tf.get_count().await?,
tf.get().await?,
start.elapsed().as_secs_f32(),
tf.get_count().await? / start.elapsed().as_millis() as u64 * 1000);
}
{
//Multithreading test
let tf = Arc::new(Actor::new(Foo::default()));
let start = Instant::now();
let a_tf = tf.clone();
let a = tokio::spawn(async move {
for i in 0..1000000 {
if let Err(er) = a_tf.add(i).await {
println!("{}", er);
};
}
});
let b_tf = tf.clone();
let b = tokio::spawn(async move {
for i in 1000000..2000000 {
if let Err(er) = b_tf.add(i).await {
println!("{}", er);
};
}
});
let c_tf = tf.clone();
let c = tokio::spawn(async move {
for i in 2000000..3000000 {
if let Err(er) = c_tf.add(i).await {
println!("{}", er);
};
}
});
c.await?;
a.await?;
b.await?;
println!("test b count:{} value:{} time:{} qps:{}",
tf.get_count().await?,
tf.get().await?,
start.elapsed().as_secs_f32(),
tf.get_count().await? / start.elapsed().as_millis() as u64 * 1000);
}
Ok(())
} ```