mux-stream

Continious integration Crates.io Docs.rs

This crate empahises the [first-class] nature of [asynchronous streams] in Rust by deriving the value construction & pattern matching operations from [ADTs], depicted by the following correspondence:

| ADTs | Streams | |----------|----------| | [Value construction] | [Multiplexing] | | [Pattern matching] | [Demultiplexing] |

Table of contents

Installation

Copy this into your Cargo.toml:

toml [dependencies] mux-stream = "0.3" tokio = "1.6" tokio-stream = "0.1" futures = "0.3"

Motivation

In many problem domains, we encounter the need to process incoming hierarchical structures. Suppose you're writing a social network, and the following kinds of updates might come at any moment:


In terms of Rust, you might want to express such updates via [sum types]:

```rust enum UserReq { SendMsg(SendMsgReq), Follow(FollowReq), MuteFriend(MuteFriendReq) }

enum SendMsgReq { Photo(...), Video(...), Text(...) }

struct FollowReq { ... }

enum MuteFriendReq { Forever(...), ForInterval(...) } ```

This is where the story begins: now you need to process user requests. Let's formulate some general requirements of requests-processing code:

This crate addresses all of the aforementioned requirements. The approach is based upon functional asynchronous: we augment asynchronous data streams with [pattern matching]. Your code would reflect the following structure (concerning with the example of a social network):


(Note the similarities with the [chain-of-responsibility pattern].)

That is, each function takes a stream of updates and propagates (demultiplexes, pattern matches) them into processors of lower layers, and hence addressing the single-responsibility principle. What is more, you're able to use all the power of [stream adaptors], which let you deal with updates as with chains, not as with single objects, declaratively.

The sections below are dedicated to demultiplexing and multiplexing separately. See also [examples/admin_panel.rs], an elaborated demonstration of the most prominent aspects of the paradigm.

Demultiplexing

Given Stream<T1 | ... | Tn>, demultiplexing produces Stream<T1>, ..., Stream<Tn>. See the illustration below, in which every circle is an item of a stream and has a type (its colour):

That is, once an update from an input stream is available, it's pushed into the corresponding output stream in a separate [Tokio task]. No output stream can slow down another one.

Example

[examples/demux.rs] ```rust use muxstream::{demux, errorhandler};

use futures::StreamExt; use tokio_stream::wrappers::UnboundedReceiverStream;

[tokio::main]

async fn main() { #[derive(Debug)] enum MyEnum { A(i32), B(f64), C(&'static str), }

let stream = tokio_stream::iter(vec![
    MyEnum::A(123),
    MyEnum::B(24.241),
    MyEnum::C("Hello"),
    MyEnum::C("ABC"),
    MyEnum::A(811),
]);

let (i32_stream, f64_stream, str_stream) =
    demux!(MyEnum { A, B, C })(stream, error_handler::panicking());

let mut i32_stream = UnboundedReceiverStream::new(i32_stream);
let mut f64_stream = UnboundedReceiverStream::new(f64_stream);
let mut str_stream = UnboundedReceiverStream::new(str_stream);

assert_eq!(i32_stream.next().await, Some(123));
assert_eq!(i32_stream.next().await, Some(811));
assert_eq!(i32_stream.next().await, None);

assert_eq!(f64_stream.next().await, Some(24.241));
assert_eq!(f64_stream.next().await, None);

assert_eq!(str_stream.next().await, Some("Hello"));
assert_eq!(str_stream.next().await, Some("ABC"));
assert_eq!(str_stream.next().await, None);

} ```

Multiplexing

Multiplexing is the opposite of demultiplexing: given Stream<T1>, ..., Stream<Tn>, it produces Stream<T1 | ... | Tn>. Again, the process is illustrated below:

That is, once an update from any input streams is available, it's pushed into the output stream. Again, this work is performed asynchronously in a separate [Tokio task].

Example

[examples/mux.rs] ```rust use muxstream::{errorhandler, mux};

use std::{collections::HashSet, iter::FromIterator};

use futures::StreamExt; use tokio::sync::mpsc::UnboundedReceiver; use tokio_stream::wrappers::UnboundedReceiverStream;

[derive(Debug)]

enum MyEnum { A(i32), B(u8), C(&'static str), }

[tokio::main]

async fn main() { let i32values = HashSet::fromiter(vec![123, 811]); let u8values = HashSet::fromiter(vec![88]); let strvalues = HashSet::fromiter(vec!["Hello", "ABC"]);

let result: UnboundedReceiver<MyEnum> = mux!(MyEnum { A, B, C })(
    tokio_stream::iter(i32_values.clone()),
    tokio_stream::iter(u8_values.clone()),
    tokio_stream::iter(str_values.clone()),
    error_handler::panicking(),
);

let (i32_results, u8_results, str_results) = UnboundedReceiverStream::new(result)
    .fold(
        (HashSet::new(), HashSet::new(), HashSet::new()),
        |(mut i32_results, mut u8_results, mut str_results), update| async move {
            match update {
                MyEnum::A(x) => i32_results.insert(x),
                MyEnum::B(x) => u8_results.insert(x),
                MyEnum::C(x) => str_results.insert(x),
            };

            (i32_results, u8_results, str_results)
        },
    )
    .await;

assert_eq!(i32_results, i32_values);
assert_eq!(u8_results, u8_values);
assert_eq!(str_results, str_values);

} ```

Hash sets are used here owing to the obvious absence of order preservation of updates from input streams.

FAQ

Q: Is only Tokio supported now?

A: Yes. I have no plans yet to support other asynchronous runtimes.