This crate empahises the [first-class] nature of [asynchronous streams] in Rust by deriving the value construction & pattern matching operations from [ADTs], depicted by the following correspondence:
| ADTs | Streams | |----------|----------| | [Value construction] | [Multiplexing] | | [Pattern matching] | [Demultiplexing] |
Copy this into your Cargo.toml
:
toml
[dependencies]
mux-stream = "0.2"
tokio = "0.3"
futures = "0.3"
In many problem domains, we encounter the need to process incoming hierarchical structures. Suppose you're writing a social network, and the following kinds of updates might come at any moment:
In terms of Rust, you might want to express such updates via [sum types]:
```rust enum UserReq { SendMsg(SendMsgReq), Follow(FollowReq), MuteFriend(MuteFriendReq) }
enum SendMsgReq { Photo(...), Video(...), Text(...) }
struct FollowReq { ... }
enum MuteFriendReq { Forever(...), ForInterval(...) } ```
This is where the story begins: now you need to process user requests. Let's formulate some general requirements of requests-processing code:
This crate addresses all of the aforementioned requirements. The approach is based upon functional asynchronous: we augment asynchronous data streams with [pattern matching]. Your code would reflect the following structure (concerning with the example of a social network):
(Note the similarities with the [chain-of-responsibility pattern].)
That is, each function takes a stream of updates and propagates (demultiplexes, pattern matches) them into processors of lower layers, and hence addressing the single-responsibility principle. What is more, you're able to use all the power of [stream adaptors], which let you deal with updates as with chains, not as with single objects, declaratively.
The sections below are dedicated to demultiplexing and multiplexing separately. See also [examples/admin_panel.rs
], an elaborated demonstration of the most prominent aspects of the paradigm.
Given Stream<T1 | ... | Tn>
, demultiplexing produces Stream<T1>, ..., Stream<Tn>
. See the illustration below, in which every circle is an item of a stream and has a type (its colour):
That is, once an update from an input stream is available, it's pushed into the corresponding output stream in a separate [Tokio task]. No output stream can slow down another one.
[examples/demux.rs
]
```rust
use muxstream::{demux, errorhandler};
use futures::{future::FutureExt, StreamExt}; use tokio::stream;
enum MyEnum { A(i32), B(f64), C(&'static str), }
let stream = stream::iter(vec![ MyEnum::A(123), MyEnum::B(24.241), MyEnum::C("Hello"), MyEnum::C("ABC"), MyEnum::A(811), ]);
let (mut i32stream, mut f64stream, mut strstream) = demux!(MyEnum { A, B, C })(stream, errorhandler::panicking());
asserteq!(i32stream.next().await, Some(123)); asserteq!(i32stream.next().await, Some(811)); asserteq!(i32stream.next().await, None);
asserteq!(f64stream.next().await, Some(24.241)); asserteq!(f64stream.next().await, None);
asserteq!(strstream.next().await, Some("Hello")); asserteq!(strstream.next().await, Some("ABC")); asserteq!(strstream.next().await, None); ```
Multiplexing is the opposite of demultiplexing: given Stream<T1>, ..., Stream<Tn>
, it produces Stream<T1 | ... | Tn>
. Again, the process is illustrated below:
That is, once an update from any input streams is available, it's pushed into the output stream. Again, this work is performed asynchronously in a separate [Tokio task].
[examples/mux.rs
]
```rust
use muxstream::{errorhandler, mux};
use std::{collections::HashSet, iter::FromIterator};
use futures::{FutureExt, StreamExt}; use tokio::{stream, sync::mpsc::UnboundedReceiver};
enum MyEnum { A(i32), B(u8), C(&'static str), }
let i32values = HashSet::fromiter(vec![123, 811]); let u8values = HashSet::fromiter(vec![88]); let strvalues = HashSet::fromiter(vec!["Hello", "ABC"]);
let result: UnboundedReceiver
let (i32results, u8results, strresults) = result .fold( (HashSet::new(), HashSet::new(), HashSet::new()), |(mut i32results, mut u8results, mut strresults), update| async move { match update { MyEnum::A(x) => i32results.insert(x), MyEnum::B(x) => u8results.insert(x), MyEnum::C(x) => str_results.insert(x), };
(i32_results, u8_results, str_results)
},
)
.await;
asserteq!(i32results, i32values); asserteq!(u8results, u8values); asserteq!(strresults, str_values); ```
Hash sets are used here owing to the obvious absence of order preservation of updates from input streams.
Q: Is only Tokio supported now?
A: Yes. I have no plans yet to support other asynchronous runtimes.