![crates-badge] ![docs-badge] ![license-badge] ![rustc-badge]
Async stream for Rust and the futures crate.
This crate provides useful features for streams, using async_await
and unstable generators
.
Add this to your Cargo.toml
:
toml
[dependencies]
futures-async-stream = "0.2"
futures = "0.3"
The current futures-async-stream requires Rust nightly 2020-02-14 or later.
#[for_await]
Processes streams using a for loop.
This is a reimplement of [futures-await]'s #[async]
for loops for futures 0.3 and is an experimental implementation of the idea listed as the next step of async/await.
```rust
use futures::stream::Stream; use futuresasyncstream::for_await;
async fn collect(stream: impl Stream
value
has the Item
type of the stream passed in. Note that async for loops can only be used inside of async
functions, closures, blocks, #[stream]
functions and stream_block!
macros.
#[stream]
Creates streams via generators.
This is a reimplement of [futures-await]'s #[stream]
for futures 0.3 and is an experimental implementation of the idea listed as the next step of async/await.
```rust
use futures::stream::Stream; use futuresasyncstream::stream;
// Returns a stream of i32
async fn foo(stream: impl Streamfor_await
is built into stream
. If you use for_await
only in stream
, there is no need to import for_await
.
#[for_await]
for x in stream {
yield x.parse().unwrap();
}
}
```
#[stream]
on async fn must have an item type specified via item = some::Path
and the values output from the stream must be yielded via the yield
expression.
#[stream]
can also be used on async blocks:
```rust
use futures::stream::Stream; use futuresasyncstream::stream;
fn foo() -> impl Stream
Note that #[stream]
on async block does not require the item
argument, but it may require additional type annotations.
You can use async stream functions in traits by passing boxed
or boxed_local
as an argument.
```rust
use futuresasyncstream::stream;
trait Foo { #[stream(boxed, item = u32)] async fn method(&mut self); }
struct Bar(u32);
impl Foo for Bar { #[stream(boxed, item = u32)] async fn method(&mut self) { while self.0 < u32::max_value() { self.0 += 1; yield self.0; } } } ```
A async stream function that received a boxed
argument is converted to a function that returns Pin<Box<dyn Stream<Item = item> + Send + 'lifetime>>
.
If you passed boxed_local
instead of boxed
, async stream function returns a non-threadsafe stream (Pin<Box<dyn Stream<Item = item> + 'lifetime>>
).
```rust
use futures::stream::Stream; use futuresasyncstream::stream; use std::pin::Pin;
// The trait itself can be defined without unstable features.
trait Foo {
fn method(&mut self) -> Pin
struct Bar(u32);
impl Foo for Bar { #[stream(boxed, item = u32)] async fn method(&mut self) { while self.0 < u32::max_value() { self.0 += 1; yield self.0; } } } ```
#[try_stream]
?
operator can be used with the #[try_stream]
. The Item
of the returned stream is Result
with Ok
being the value yielded and Err
the error type returned by ?
operator or return Err(...)
.
```rust
use futures::stream::Stream; use futuresasyncstream::try_stream;
async fn foo(stream: impl Stream
#[try_stream]
can be used wherever #[stream]
can be used.
#[for_await]
You can write this by combining while let
loop, .await
, pin_mut
macro, and StreamExt::next()
method:
```rust use futures::{ pin_mut, stream::{Stream, StreamExt}, };
async fn collect(stream: impl Stream
#[stream]
You can write this by manually implementing the combinator:
```rust use futures::{ ready, stream::Stream, task::{Context, Poll}, }; use pinproject::pinproject; use std::pin::Pin;
fn foo(stream: S) -> impl Stream
struct Foo {
#[pin]
stream: S,
}
impl Stream for Foo
where
S: Stream
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(x) = ready!(self.project().stream.poll_next(cx)) {
Poll::Ready(Some(x.parse().unwrap()))
} else {
Poll::Ready(None)
}
}
} ```
Licensed under either of
at your option.
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.