futures-async-stream

Build Status Crates.io Docs.rs License Minimum supported Rust version

Async stream for Rust and the futures crate.

This crate provides useful features for streams, using async_await and unstable generators.

Usage

Add this to your Cargo.toml:

toml [dependencies] futures-async-stream = "0.1.0-alpha.6" futures-preview = "0.3.0-alpha.18"

The current futures-async-stream requires Rust nightly 2019-08-21 or later.

#[for_await]

Processes streams using a for loop.

This is a reimplement of [futures-await]'s #[async] for loops for futures 0.3 and is an experimental implementation of the idea listed as the next step of async/await.

```rust

![feature(stmtexprattributes, procmacrohygiene)]

use futures::stream::Stream; use futuresasyncstream::for_await;

async fn collect(stream: impl Stream) -> Vec { let mut vec = Vec::new(); #[for_await] for value in stream { vec.push(value); } vec } ```

value has the Item type of the stream passed in. Note that async for loops can only be used inside of async functions, closures, blocks, #[async_stream] functions and async_stream_block! macros.

#[async_stream]

Creates streams via generators.

This is a reimplement of [futures-await]'s #[async_stream] for futures 0.3 and is an experimental implementation of the idea listed as the next step of async/await.

```rust

![feature(generators)]

use futures::stream::Stream; use futuresasyncstream::async_stream;

// Returns a stream of i32

[async_stream(item = i32)]

async fn foo(stream: impl Stream) { // for_await is built into async_stream. If you use for_await only in async_stream, there is no need to import for_await. #[for_await] for x in stream { yield x.parse().unwrap(); } } ```

#[async_stream] must have an item type specified via item = some::Path and the values output from the stream must be yielded via the yield expression.

asyncstreamblock!

You can create a stream directly as an expression using an async_stream_block! macro:

```rust

![feature(generators, procmacrohygiene)]

use futures::stream::Stream; use futuresasyncstream::asyncstreamblock;

fn foo() -> impl Stream { asyncstreamblock! { for i in 0..10 { yield i; } } } ```

Using async stream functions in traits

You can use async stream functions in traits by passing boxed or boxed_local as an argument.

```rust

![feature(generators)]

use futuresasyncstream::async_stream;

trait Foo { #[async_stream(boxed, item = u32)] async fn method(&mut self); }

struct Bar(u32);

impl Foo for Bar { #[asyncstream(boxed, item = u32)] async fn method(&mut self) { while self.0 < u32::maxvalue() { self.0 += 1; yield self.0; } } } ```

A async stream function that received a boxed argument is converted to a function that returns Pin<Box<dyn Stream<Item = item> + Send + 'lifetime>>. If you passed boxed_local instead of boxed, async stream function returns a non-threadsafe stream (Pin<Box<dyn Stream<Item = item> + 'lifetime>>).

```rust

![feature(generators)]

use futures::stream::Stream; use futuresasyncstream::async_stream; use std::pin::Pin;

// The trait itself can be defined without unstable features. trait Foo { fn method(&mut self) -> Pin + Send + '_>>; }

struct Bar(u32);

impl Foo for Bar { #[asyncstream(boxed, item = u32)] async fn method(&mut self) { while self.0 < u32::maxvalue() { self.0 += 1; yield self.0; } } } ```

#[asynctrystream] and asynctrystream_block!

? operator can be used with the #[async_try_stream] and async_try_stream_block!. The Item of the returned stream is Result with Ok being the value yielded and Err the error type returned by ? operator or return Err(...).

```rust

![feature(generators)]

use futures::stream::Stream; use futuresasyncstream::asynctrystream;

[asynctrystream(ok = i32, error = Box)]

async fn foo(stream: impl Stream) { #[for_await] for x in stream { yield x.parse()?; } } ```

How to write the equivalent code without this API?

#[for_await]

You can write this by combining while let loop, .await, pin_mut macro, and StreamExt::next() method:

```rust use futures::{ pin_mut, stream::{Stream, StreamExt}, };

async fn collect(stream: impl Stream) -> Vec { let mut vec = Vec::new(); pin_mut!(stream); while let Some(value) = stream.next().await { vec.push(value); } vec } ```

#[async_stream]

You can write this by manually implementing the combinator:

```rust use futures::{ stream::Stream, ready, task::{Context, Poll}, }; use pinproject::pinproject; use std::pin::Pin;

fn foo(stream: S) -> impl Stream where S: Stream, { Foo { stream } }

[pin_project]

struct Foo { #[pin] stream: S, }

impl Stream for Foo where S: Stream, { type Item = i32;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
    if let Some(x) = ready!(self.project().stream.poll_next(cx)) {
        Poll::Ready(Some(x.parse().unwrap()))
    } else {
        Poll::Ready(None)
    }
}

} ```

License

Licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.