Async stream for Rust and the futures crate.
This crate provides useful features for streams, using async_await and unstable generators.
Add this to your Cargo.toml:
toml
[dependencies]
futures-async-stream = "0.1.0-alpha.6"
futures-preview = "0.3.0-alpha.18"
The current futures-async-stream requires Rust nightly 2019-08-21 or later.
Processes streams using a for loop.
This is a reimplement of [futures-await]'s #[async] for loops for futures 0.3 and is an experimental implementation of the idea listed as the next step of async/await.
```rust
use futures::stream::Stream; use futuresasyncstream::for_await;
async fn collect(stream: impl Stream
value has the Item type of the stream passed in. Note that async for loops can only be used inside of async functions, closures, blocks, #[async_stream] functions and async_stream_block! macros.
Creates streams via generators.
This is a reimplement of [futures-await]'s #[async_stream] for futures 0.3 and is an experimental implementation of the idea listed as the next step of async/await.
```rust
use futures::stream::Stream; use futuresasyncstream::async_stream;
// Returns a stream of i32
async fn foo(stream: impl Streamfor_await is built into async_stream. If you use for_await only in async_stream, there is no need to import for_await.
#[for_await]
for x in stream {
yield x.parse().unwrap();
}
}
```
#[async_stream] must have an item type specified via item = some::Path and the values output from the stream must be yielded via the yield expression.
You can create a stream directly as an expression using an async_stream_block! macro:
```rust
use futures::stream::Stream; use futuresasyncstream::asyncstreamblock;
fn foo() -> impl Stream
You can use async stream functions in traits by passing boxed or boxed_local as an argument.
```rust
use futuresasyncstream::async_stream;
trait Foo { #[async_stream(boxed, item = u32)] async fn method(&mut self); }
struct Bar(u32);
impl Foo for Bar { #[asyncstream(boxed, item = u32)] async fn method(&mut self) { while self.0 < u32::maxvalue() { self.0 += 1; yield self.0; } } } ```
A async stream function that received a boxed argument is converted to a function that returns Pin<Box<dyn Stream<Item = item> + Send + 'lifetime>>.
If you passed boxed_local instead of boxed, async stream function returns a non-threadsafe stream (Pin<Box<dyn Stream<Item = item> + 'lifetime>>).
```rust
use futures::stream::Stream; use futuresasyncstream::async_stream; use std::pin::Pin;
// The trait itself can be defined without unstable features.
trait Foo {
fn method(&mut self) -> Pin
struct Bar(u32);
impl Foo for Bar { #[asyncstream(boxed, item = u32)] async fn method(&mut self) { while self.0 < u32::maxvalue() { self.0 += 1; yield self.0; } } } ```
? operator can be used with the #[async_try_stream] and async_try_stream_block!. The Item of the returned stream is Result with Ok being the value yielded and Err the error type returned by ? operator or return Err(...).
```rust
use futures::stream::Stream; use futuresasyncstream::asynctrystream;
async fn foo(stream: impl Stream
You can write this by combining while let loop, .await, pin_mut macro, and StreamExt::next() method:
```rust use futures::{ pin_mut, stream::{Stream, StreamExt}, };
async fn collect(stream: impl Stream
You can write this by manually implementing the combinator:
```rust use futures::{ stream::Stream, ready, task::{Context, Poll}, }; use pinproject::pinproject; use std::pin::Pin;
fn foo(stream: S) -> impl Stream
struct Foo {
#[pin]
stream: S,
}
impl Stream for Foo
where
S: Stream
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(x) = ready!(self.project().stream.poll_next(cx)) {
Poll::Ready(Some(x.parse().unwrap()))
} else {
Poll::Ready(None)
}
}
} ```
Licensed under either of
at your option.
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.