futures-batch

Build status Cargo Documentation

An adaptor that chunks up completed futures in a stream and flushes them after a timeout or when the buffer is full. It is based on the Chunks adaptor of futures-util, to which we added a timeout.

(The project was initially called tokio-batch, but was renamed as it has no dependency on Tokio anymore.)

Usage

Either as a standalone stream operator or directly as a combinator:

```rust use std::time::Duration; use futures::{stream, StreamExt}; use futures_batch::ChunksTimeoutStreamExt;

[tokio::main]

async fn main() { let iter = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9].intoiter(); let results = stream::iter(iter) .chunkstimeout(5, Duration::new(10, 0)) .collect::>();

assert_eq!(vec![vec![0, 1, 2, 3, 4], vec![5, 6, 7, 8, 9]], results.await);

} ```

The above code iterates over a stream and creates chunks of size 5 with a timeout of 10 seconds.
Note: This is using the futures 0.3 crate.

Performance

futures-batch imposes very low overhead on your application. For example, it is even used to batch syscalls.
Under the hood, we are using futures-timer, which allows for a microsecond timer resolution. If you find a use-case which is not covered, don't be reluctant to open an issue.

Credits

Thanks to arielb1, alexcrichton, doyoubi, leshow, spebern, and wngr for their contributions!