futures-batch

Build status

An adaptor that chunks up elements and flushes them after a timeout or when the buffer is full. (The project was initially called tokio-batch, but was renamed as it has no dependency on Tokio anymore.)

Description

An adaptor that chunks up elements in a vector.

This adaptor will buffer up a list of items in the stream and pass on the vector used for buffering when a specified capacity has been reached or a predefined timeout was triggered.

Usage

Either as a standalone Stream Operator or directly as a combinator: ```rust use futures::future; use futures::stream; use futures::{FutureExt, StreamExt, TryFutureExt}; use std::time::Duration; use futures_batch::ChunksTimeoutStreamExt;

fn main() { let iter = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9].intoiter(); let v = stream::iter(iter) .chunkstimeout(5, Duration::new(10, 0)) .collect::>();

tokio::run(
    v.then(|res| {
        assert_eq!(vec![vec![0, 1, 2, 3, 4], vec![5, 6, 7, 8, 9]], res);
        future::ready(())
    })
    .unit_error()
    .boxed()
    .compat(),
);

} ```

Note: This is using the futures-preview crate. Check this blog post about the futures-rs compability layer.

Performance

futures-batch imposes very low overhead on your application. For example, it is even used to batch syscalls.
Under the hood, we are using futures-timer, which allows for microsecond timer resolution. If you find a use-case which is not covered, don't be reluctant to open an issue.

Credits

This was taken and adjusted from futures-util and moved into a separate crate for reusability. Since then it has been modified to support higher-resolution timers.

Thanks to arielb1, alexcrichton, doyoubi, spebern, wngr for their contributions!