This crate empahises the [first-class] nature of [asynchronous streams] in Rust by deriving the value construction & pattern matching operations from [ADTs], depicted by the following correspondence:
| ADTs | Streams | |----------|----------| | [Value construction] | [Multiplexing] | | [Pattern matching] | [Demultiplexing] |
Copy this into your Cargo.toml
:
toml
[dependencies]
mux-stream = "0.3"
tokio = "1.6"
tokio-stream = "0.1"
futures = "0.3"
In many problem domains, we encounter the need to process incoming hierarchical structures. Suppose you're writing a social network, and the following kinds of updates might come at any moment:
In terms of Rust, you might want to express such updates via [sum types]:
```rust enum UserReq { SendMsg(SendMsgReq), Follow(FollowReq), MuteFriend(MuteFriendReq) }
enum SendMsgReq { Photo(...), Video(...), Text(...) }
struct FollowReq { ... }
enum MuteFriendReq { Forever(...), ForInterval(...) } ```
This is where the story begins: now you need to process user requests. Let's formulate some general requirements of requests-processing code:
This crate addresses all of the aforementioned requirements. The approach is based upon functional asynchronous: we augment asynchronous data streams with [pattern matching]. Your code would reflect the following structure (concerning with the example of a social network):
(Note the similarities with the [chain-of-responsibility pattern].)
That is, each function takes a stream of updates and propagates (demultiplexes, pattern matches) them into processors of lower layers, and hence addressing the single-responsibility principle. What is more, you're able to use all the power of [stream adaptors], which let you deal with updates as with chains, not as with single objects, declaratively.
The sections below are dedicated to demultiplexing and multiplexing separately. See also [examples/admin_panel.rs
], an elaborated demonstration of the most prominent aspects of the paradigm.
Given Stream<T1 | ... | Tn>
, demultiplexing produces Stream<T1>, ..., Stream<Tn>
. See the illustration below, in which every circle is an item of a stream and has a type (its colour):
That is, once an update from an input stream is available, it's pushed into the corresponding output stream in a separate [Tokio task]. No output stream can slow down another one.
[examples/demux.rs
]
```rust
use muxstream::{demux, errorhandler};
use futures::StreamExt; use tokio_stream::wrappers::UnboundedReceiverStream;
async fn main() { #[derive(Debug)] enum MyEnum { A(i32), B(f64), C(&'static str), }
let stream = tokio_stream::iter(vec![
MyEnum::A(123),
MyEnum::B(24.241),
MyEnum::C("Hello"),
MyEnum::C("ABC"),
MyEnum::A(811),
]);
let (i32_stream, f64_stream, str_stream) =
demux!(MyEnum { A, B, C })(stream, error_handler::panicking());
let mut i32_stream = UnboundedReceiverStream::new(i32_stream);
let mut f64_stream = UnboundedReceiverStream::new(f64_stream);
let mut str_stream = UnboundedReceiverStream::new(str_stream);
assert_eq!(i32_stream.next().await, Some(123));
assert_eq!(i32_stream.next().await, Some(811));
assert_eq!(i32_stream.next().await, None);
assert_eq!(f64_stream.next().await, Some(24.241));
assert_eq!(f64_stream.next().await, None);
assert_eq!(str_stream.next().await, Some("Hello"));
assert_eq!(str_stream.next().await, Some("ABC"));
assert_eq!(str_stream.next().await, None);
} ```
Multiplexing is the opposite of demultiplexing: given Stream<T1>, ..., Stream<Tn>
, it produces Stream<T1 | ... | Tn>
. Again, the process is illustrated below:
That is, once an update from any input streams is available, it's pushed into the output stream. Again, this work is performed asynchronously in a separate [Tokio task].
[examples/mux.rs
]
```rust
use muxstream::{errorhandler, mux};
use std::{collections::HashSet, iter::FromIterator};
use futures::StreamExt; use tokio::sync::mpsc::UnboundedReceiver; use tokio_stream::wrappers::UnboundedReceiverStream;
enum MyEnum { A(i32), B(u8), C(&'static str), }
async fn main() { let i32values = HashSet::fromiter(vec![123, 811]); let u8values = HashSet::fromiter(vec![88]); let strvalues = HashSet::fromiter(vec!["Hello", "ABC"]);
let result: UnboundedReceiver<MyEnum> = mux!(MyEnum { A, B, C })(
tokio_stream::iter(i32_values.clone()),
tokio_stream::iter(u8_values.clone()),
tokio_stream::iter(str_values.clone()),
error_handler::panicking(),
);
let (i32_results, u8_results, str_results) = UnboundedReceiverStream::new(result)
.fold(
(HashSet::new(), HashSet::new(), HashSet::new()),
|(mut i32_results, mut u8_results, mut str_results), update| async move {
match update {
MyEnum::A(x) => i32_results.insert(x),
MyEnum::B(x) => u8_results.insert(x),
MyEnum::C(x) => str_results.insert(x),
};
(i32_results, u8_results, str_results)
},
)
.await;
assert_eq!(i32_results, i32_values);
assert_eq!(u8_results, u8_values);
assert_eq!(str_results, str_values);
} ```
Hash sets are used here owing to the obvious absence of order preservation of updates from input streams.
Q: Is only Tokio supported now?
A: Yes. I have no plans yet to support other asynchronous runtimes.