Asynchronous stream of elements.
Provides two macros, stream!
and try_stream!
, allowing the caller to
define asynchronous streams of elements. These are implemented using async
& await
notation. This crate works without unstable features.
The stream!
macro returns an anonymous type implementing the [Stream
]
trait. The Item
associated type is the type of the values yielded from the
stream. The try_stream!
also returns an anonymous type implementing the
[Stream
] trait, but the Item
associated type is Result<T, Error>
. The
try_stream!
macro supports using ?
notation as part of the
implementation.
A basic stream yielding numbers. Values are yielded using the yield
keyword. The stream block must return ()
.
```rust use async_stream::stream;
use futuresutil::pinmut; use futures_util::stream::StreamExt;
async fn main() { let s = stream! { for i in 0..3 { yield i; } };
pin_mut!(s); // needed for iteration
while let Some(value) = s.next().await {
println!("got {}", value);
}
} ```
Streams may be returned by using impl Stream<Item = T>
:
```rust use async_stream::stream;
use futurescore::stream::Stream; use futuresutil::pinmut; use futuresutil::stream::StreamExt;
fn zerotothree() -> impl Stream
async fn main() { let s = zerotothree(); pin_mut!(s); // needed for iteration
while let Some(value) = s.next().await {
println!("got {}", value);
}
} ```
Streams may be implemented in terms of other streams - async-stream
provides for await
syntax to assist with this:
```rust use async_stream::stream;
use futurescore::stream::Stream; use futuresutil::pinmut; use futuresutil::stream::StreamExt;
fn zerotothree() -> impl Stream
fn double
async fn main() { let s = double(zerotothree()); pin_mut!(s); // needed for iteration
while let Some(value) = s.next().await {
println!("got {}", value);
}
} ```
Rust try notation (?
) can be used with the try_stream!
macro. The Item
of the returned stream is Result
with Ok
being the value yielded and
Err
the error type returned by ?
.
```rust use tokio::net::{TcpListener, TcpStream};
use asyncstream::trystream; use futures_core::stream::Stream;
use std::io; use std::net::SocketAddr;
fn bindandaccept(addr: SocketAddr)
-> impl Stream
loop {
let (stream, addr) = listener.accept().await?;
println!("received on {:?}", addr);
yield stream;
}
}
} ```
The stream!
and try_stream!
macros are implemented using proc macros.
The macro searches the syntax tree for instances of yield $expr
and
transforms them into sender.send($expr).await
.
The stream uses a lightweight sender to send values from the stream
implementation to the caller. When entering the stream, an Option<T>
is
stored on the stack. A pointer to the cell is stored in a thread local and
poll
is called on the async block. When poll
returns.
sender.send(value)
stores the value that cell and yields back to the
caller.
async-stream
is built against the latest stable release. The minimum supported version is 1.45 due to function-like procedural macros in expression, pattern, and statement positions.
This project is licensed under the MIT license.
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in async-stream
by you, shall be licensed as MIT, without any
additional terms or conditions.