提供rust异步并发框架,底层基于非阻塞的IO操作和事件驱动的机制来实现.
A runtime for wrting asynchronous applications with the Rust programming language, based on event-driven, non-blocking I/O platfrom.
曾经在工作中深入对比过已有的C/Rust并发框架的并发性能,C版本的私有实现在各个场景下的测试数据都比tokio的好,但Rust版本在编码效率和难度上都胜于C版本. 此外C版本本身在嵌入式环境上使用,内存和磁盘资源都非常受限,因此Rust并发框架支持no_std
是非常有必要的.
We have compared the concurrent performance of the existing C/Rust concurrent framework in our work. The test data of the proprietary implementation of the C version is better than that of the Tokio version in all scenarios, but the Rust version is better than that of the C version in terms of coding efficiency and difficulty. In addition, the C version itself is used in embedded environments, and memory and disk resources are very limited. Therefor, it is necessary for the Rust concurrent framework to support no_std
.
no_std
no_std
环境也需要一个异步并发框架,现在广泛使用的tokio等并不支持. 本crate基于linux libc的能力构建. 因为内部使用了linux的eventfd/epoll,当前还仅支持linux.
spawn系列接口基本上同tokio的定义,只要在运行时创建之后可以在同步和异步环境的任何时候调用它, 没有调用上下文的约束. 此外其返回的JoinHandle提供join接口,供同步环境中等待异步任务的结束. 注意异步函数中,不能调用join,否则会阻塞当前工作线程.
阻塞等待异步任务结束,一般用于业务层的异步任务的入口函数的调用. 这类似标准库thread::scope的使用方式.
```rust use hirun::runtime::{Builder, block_on};
fn main() { Builder::new().build().unwrap(); let val = blockon(asyncmain(100)).unwrap(); println!("async_main return {val}"); }
async fn async_main(val: i32) -> i32 { val + 100 } ```
在没有统一的异步任务入口的时候,只是在同步流程的某些环节利用异步并发机制提升并发度,那么可以利用spawn接口,在需要的时候调用join等待异步任务返回,这样异步任务和同步环境可并发执行. 这类似标准库thread::spawn的使用方式.
```rust use hirun::runtime::{Builder, spawn};
fn main() { Builder::new().build().unwrap(); let val = spawn(foo(100)).join().unwrap(); println!("async foo return: {val}"); }
async fn foo(val: i32) -> i32 { val + 100 } ```
包含阻塞操作的异步任务最好在单独的运行时实例中调度,避免对其他异步任务的影响. 可在spawn_with
接口中按需指定运行时实例.
``` rust use hirun::runtime::{Builder, spawn, spawn_with, Attr};
const BLOCKRUNTIMEID: u8 = 1;
fn main() { Builder::new().build().unwrap(); Builder::new().id(BLOCKRUNTIMEID).build().unwrap(); let h1 = spawn(foo(100)); let h2 = spawnwith(bar(200), Attr::new().id(BLOCKRUNTIMEID)); println!("default runtime: foo return {}", h1.join().unwrap()); println!("runtime1: bar return {}", h2.join().unwrap()); }
async fn foo(val: i32) -> i32 { val + 100 }
async fn bar(val: i32) -> i32 { val + 1000 } ```
业务上有需要约束某些任务必须在一个线程或者不同线程中调度,业务层可以为任务指定hash值实现这个功能. 使用这个功能应该要了解运行时的工作线程的数量, 才能利用hash值达到自身的控制目标.
以下代码强制异步任务一定在同一个工作线程运行.
```rust use hirun::runtime::{Builder, spawnwith, Attr}; use libc::pthreadself;
fn main() { Builder::new().nth(2).build().unwrap(); let h1 = spawnwith(foo(200), Attr::new().hash(1)); let h2 = spawnwith(bar(200), Attr::new().hash(1)); println!("foo return {}", h1.join().unwrap()); println!("bar return {}", h2.join().unwrap()); }
async fn foo(val: i32) -> i32 { println!("pthreadid: {}", unsafe { pthreadself() }); val + 100 }
async fn bar(val: i32) -> i32 { println!("pthreadid: {}", unsafe { pthreadself() }); val + 1000 } ```
批量分发异步任务后,可能有需要等待所有任务执行完毕后返回,也可能等待最先完成的任务返回,可利用JoinSet实现.
以下等待所有任务完成后再返回.
```rust use hirun::runtime::{Builder, block_on, spawn, JoinSet};
fn main() { Builder::new().nth(2).build().unwrap(); blockon(async { let mut set = JoinSet::new(); let _ = set.spawn(foo(100)); let _ = set.spawn(bar(200)); for (seqno, val) in set.waitall().await { println!("{seqno}, return {}", val.unwrap()); } }); }
async fn foo(val: i32) -> i32 { val + 100 }
async fn bar(val: i32) -> i32 { val + 1000 } ```
也可以基于任务完成的先后顺序进行处理.
```rust use hirun::runtime::{Builder, block_on, spawn, JoinSet, sleep}; use core::time::Duration;
fn main() { Builder::new().nth(2).build().unwrap(); blockon(async { let mut set = JoinSet::new(); let _ = set.spawn(foo(100)); let _ = set.spawn(bar(200)); while let Some((seqno, val)) = set.waitany().await { println!("{seqno}, return {}", val.unwrap()); } }); }
async fn foo(val: i32) -> i32 { sleep(Duration::new(1, 0)).await; val + 100 }
async fn bar(val: i32) -> i32 { val + 1000 } ```
需要支持自定义的IPC通信机制,这些机制都是基于Linux的文件系统来实现的, 使用方式相同: 创建文件句柄,利用poll机制获取异步IO事件,调用read/write读写数据.Linux新的io_uring
也可基于poll机制获取提交任务的完成情况.
本crate未提供TcpLisenter/TcpStream这类高级封装,仅封装fd,即Fd,同时提供AioFd,支持异步读写和获取异步IO事件通知的功能,具有最大的普适性. 只封装了最基础的功能,更多的功能需要业务层基于libc crate的api来完成.
```rust use hirun::runtime::{Builder, block_on}; use hirun::net::{Fd, AioFd, SocketAddr}; use hirun::event::POLLIN;
fn main() { Builder::new().nth(2).build().unwrap(); let _ = blockon(async { let serveraddr = SocketAddr::inet("127.0.0.1", 2000).unwrap();
let fd = Fd::tcp_client(libc::AF_INET, None).unwrap();
let mut aiofd = AioFd::new(&fd);
aiofd.connect(&server_addr).await.unwrap();
aiofd.wait(POLLIN).await.unwrap();
let mut buf = [0_u8; 100];
if let Ok(size) = aiofd.try_read(&mut buf) {
println!("recv {size} bytes from server");
}
}).unwrap();
} ```
也可以直接使用异步读取接口:
```rust use hirun::runtime::{Builder, block_on}; use hirun::net::{Fd, AioFd, SocketAddr};
fn main() { Builder::new().nth(2).build().unwrap(); let _ = blockon(async { let serveraddr = SocketAddr::inet("127.0.0.1", 2000).unwrap();
let fd = Fd::tcp_client(libc::AF_INET, None).unwrap();
let mut aiofd = AioFd::new(&fd);
aiofd.connect(&server_addr).await.unwrap();
let mut buf = [0_u8; 100];
if let Ok(size) = aiofd.read(&mut buf).await {
println!("recv {size} bytes from server");
}
}).unwrap();
} ```
异步函数的参数一定会是异步任务的内置数据成员,而并发框架创建的异步任务都会占用堆内存空间. 如果大量任务使用异步读取接口,因为缓冲器在堆上分配, 可能导致占用的堆内存空间比较大. 如果内存资源有限,推荐使用async wait + try_read
这种组合使用方式.
#[future]
现有Rust自动判断异步函数是否支持Send的规则存在一定局限性. 一个异步函数内部仅仅是直接调用异步子函数,不会通过spawn
类接口创建并发的异步任务,那么这个异步函数内部实际上是可以安全的使用Rc这些类型.
以下代码如果async fn foo
不使用#[future]
修饰,则会报告因为Future不支持Send无法通过编译.
注意: #[future]
生成unsafe代码,将异步函数的函数体转换为支持Send,如果是异步函数的入参不支持Send,则这类异步函数只能使用spawn_local
在当前线程调度.
```rust use hirun::runtime::{Builder, spawn}; use hirun::future; use std::rc::Rc;
fn main() { Builder::new().nth(2).build().unwrap(); let h = spawn(foo(100)); println!("async foo return {}", h.join().unwrap()); }
async fn foo(val: i32) -> i32 { let rc = Rc::new(100); val + bar(*rc).await }
async fn bar(val: i32) -> i32 { val + 1000 } ```
examples/httpserver和examples/tokioserver是本crate和tokio实现的完全相同的一个测试用http server,可以利用httperf测试其性能.
启动httpserver, 服务监听端口2000:
```shell
```
启动tokioserver, 服务监听端口2001:
```shell
```
启动httperf测试, 具体测试参数参见httperf的帮助说明.
```shell
```
目前已有的数据看,不弱于tokio,不少场景下(变化因素: http_body_size
, --num-calls
, --num-conns
)比tokio更优.
在用户的使用环境上进行对比验证获取的数据最真实.