Async stream for Rust and the futures crate.
This crate provides useful features for streams, using async_await
and unstable generators
.
Add this to your Cargo.toml
:
toml
[dependencies]
futures-async-stream = "0.1.0-alpha.7"
futures-preview = "0.3.0-alpha.18"
The current futures-async-stream requires Rust nightly 2019-08-21 or later.
Processes streams using a for loop.
This is a reimplement of [futures-await]'s #[async]
for loops for futures 0.3 and is an experimental implementation of the idea listed as the next step of async/await.
```rust
use futures::stream::Stream; use futuresasyncstream::for_await;
async fn collect(stream: impl Stream
value
has the Item
type of the stream passed in. Note that async for loops can only be used inside of async
functions, closures, blocks, #[async_stream]
functions and async_stream_block!
macros.
Creates streams via generators.
This is a reimplement of [futures-await]'s #[async_stream]
for futures 0.3 and is an experimental implementation of the idea listed as the next step of async/await.
```rust
use futures::stream::Stream; use futuresasyncstream::async_stream;
// Returns a stream of i32
async fn foo(stream: impl Streamfor_await
is built into async_stream
. If you use for_await
only in async_stream
, there is no need to import for_await
.
#[for_await]
for x in stream {
yield x.parse().unwrap();
}
}
```
#[async_stream]
must have an item type specified via item = some::Path
and the values output from the stream must be yielded via the yield
expression.
You can create a stream directly as an expression using an async_stream_block!
macro:
```rust
use futures::stream::Stream; use futuresasyncstream::asyncstreamblock;
fn foo() -> impl Stream
You can use async stream functions in traits by passing boxed
or boxed_local
as an argument.
```rust
use futuresasyncstream::async_stream;
trait Foo { #[async_stream(boxed, item = u32)] async fn method(&mut self); }
struct Bar(u32);
impl Foo for Bar { #[asyncstream(boxed, item = u32)] async fn method(&mut self) { while self.0 < u32::maxvalue() { self.0 += 1; yield self.0; } } } ```
A async stream function that received a boxed
argument is converted to a function that returns Pin<Box<dyn Stream<Item = item> + Send + 'lifetime>>
.
If you passed boxed_local
instead of boxed
, async stream function returns a non-threadsafe stream (Pin<Box<dyn Stream<Item = item> + 'lifetime>>
).
```rust
use futures::stream::Stream; use futuresasyncstream::async_stream; use std::pin::Pin;
// The trait itself can be defined without unstable features.
trait Foo {
fn method(&mut self) -> Pin
struct Bar(u32);
impl Foo for Bar { #[asyncstream(boxed, item = u32)] async fn method(&mut self) { while self.0 < u32::maxvalue() { self.0 += 1; yield self.0; } } } ```
?
operator can be used with the #[async_try_stream]
and async_try_stream_block!
. The Item
of the returned stream is Result
with Ok
being the value yielded and Err
the error type returned by ?
operator or return Err(...)
.
```rust
use futures::stream::Stream; use futuresasyncstream::asynctrystream;
async fn foo(stream: impl Stream
You can write this by combining while let
loop, .await
, pin_mut
macro, and StreamExt::next()
method:
```rust use futures::{ pin_mut, stream::{Stream, StreamExt}, };
async fn collect(stream: impl Stream
You can write this by manually implementing the combinator:
```rust use futures::{ stream::Stream, ready, task::{Context, Poll}, }; use pinproject::pinproject; use std::pin::Pin;
fn foo(stream: S) -> impl Stream
struct Foo {
#[pin]
stream: S,
}
impl Stream for Foo
where
S: Stream
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(x) = ready!(self.project().stream.poll_next(cx)) {
Poll::Ready(Some(x.parse().unwrap()))
} else {
Poll::Ready(None)
}
}
} ```
Licensed under either of
at your option.
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.