future
hard_cap
hard_cap
is reached no longer consumes causing producer to backoff and slowdownrelease_after
has passed since the latest successful content release(or since start) and buffer is not emptysoft_cap
of items were addedconfirmed
or returned
to the bufferfuture
user can await
on
[dependencies]
relabuf = "~0.15.0"
example:
```rust use anyhow::Context; use flume::{bounded, Sender}; use relabuf::{ExponentialBackoff, RelaBuf, RelaBufConfig}; use std::time::{Duration, Instant}; use async_io::Timer;
async fn producer(tx: Sender
let t = Instant::now();
let r = tx.send_async(i).await;
println!("emit for {} took {:?}: {:?}", i, t.elapsed(), r);
}
println!("producer is finished!")
}
async fn main() { let (tx, rx) = bounded(5);
tokio::spawn(producer(tx));
let opts = RelaBufConfig {
soft_cap: 3,
hard_cap: 5,
release_after: Duration::from_secs(5),
backoff: Some(ExponentialBackoff {
max_elapsed_time: None,
..ExponentialBackoff::default()
}),
};
let (buf, proxy) = RelaBuf::new(opts, move || {
let rx = rx.clone();
Box::pin(async move { rx.recv_async().await.context("cannot read") })
});
tokio::spawn(proxy.go());
let mut i = 0;
while let Ok(consumed) = buf.next().await {
i += 1;
if i <= 7 {
println!(
"consumed {:?} because {:?}, since last consumption {:?} - returning due to err",
consumed.items, consumed.reason, consumed.elapsed
);
consumed.return_on_err();
} else {
println!(
"consumed {:?} because {:?}, since last consumption {:?}",
consumed.items, consumed.reason, consumed.elapsed
);
consumed.confirm();
}
}
println!("done ;)");
} ```