mux-stream

Continious integration Crates.io Docs.rs

This crate empahises the [first-class] nature of [asynchronous streams] in Rust by deriving the value construction & pattern matching operations from [ADTs], depicted by the following correspondence:

| ADTs | Streams | |----------|----------| | [Value construction] | [Multiplexing] | | [Pattern matching] | [Demultiplexing] |

Table of contents

Installation

Copy this into your Cargo.toml:

toml [dependencies] mux-stream = "0.2" tokio = "0.3" futures = "0.3"

Motivation

In many problem domains, we encounter the need to process incoming hierarchical structures. Suppose you're writing a social network, and the following kinds of updates might come at any moment:


In terms of Rust, you might want to express such updates via [sum types]:

```rust enum UserReq { SendMsg(SendMsgReq), Follow(FollowReq), MuteFriend(MuteFriendReq) }

enum SendMsgReq { Photo(...), Video(...), Text(...) }

struct FollowReq { ... }

enum MuteFriendReq { Forever(...), ForInterval(...) } ```

This is where the story begins: now you need to process user requests. Let's formulate some general requirements of requests-processing code:

This crate addresses all of the aforementioned requirements. The approach is based upon functional asynchronous: we augment asynchronous data streams with [pattern matching]. Your code would reflect the following structure (concerning with the example of a social network):


(Note the similarities with the [chain-of-responsibility pattern].)

That is, each function takes a stream of updates and propagates (demultiplexes, pattern matches) them into processors of lower layers, and hence addressing the single-responsibility principle. What is more, you're able to use all the power of [stream adaptors], which let you deal with updates as with chains, not as with single objects, declaratively.

The sections below are dedicated to demultiplexing and multiplexing separately. See also [examples/admin_panel.rs], an elaborated demonstration of the most prominent aspects of the paradigm.

Demultiplexing

Given Stream<T1 | ... | Tn>, demultiplexing produces Stream<T1>, ..., Stream<Tn>. See the illustration below, in which every circle is an item of a stream and has a type (its colour):

That is, once an update from an input stream is available, it's pushed into the corresponding output stream in a separate [Tokio task]. No output stream can slow down another one.

Example

[examples/demux.rs] ```rust use muxstream::{demux, errorhandler};

use futures::{future::FutureExt, StreamExt}; use tokio::stream;

[derive(Debug)]

enum MyEnum { A(i32), B(f64), C(&'static str), }

let stream = stream::iter(vec![ MyEnum::A(123), MyEnum::B(24.241), MyEnum::C("Hello"), MyEnum::C("ABC"), MyEnum::A(811), ]);

let (mut i32stream, mut f64stream, mut strstream) = demux!(MyEnum { A, B, C })(stream, errorhandler::panicking());

asserteq!(i32stream.next().await, Some(123)); asserteq!(i32stream.next().await, Some(811)); asserteq!(i32stream.next().await, None);

asserteq!(f64stream.next().await, Some(24.241)); asserteq!(f64stream.next().await, None);

asserteq!(strstream.next().await, Some("Hello")); asserteq!(strstream.next().await, Some("ABC")); asserteq!(strstream.next().await, None); ```

Multiplexing

Multiplexing is the opposite of demultiplexing: given Stream<T1>, ..., Stream<Tn>, it produces Stream<T1 | ... | Tn>. Again, the process is illustrated below:

That is, once an update from any input streams is available, it's pushed into the output stream. Again, this work is performed asynchronously in a separate [Tokio task].

Example

[examples/mux.rs] ```rust use muxstream::{errorhandler, mux};

use std::{collections::HashSet, iter::FromIterator};

use futures::{FutureExt, StreamExt}; use tokio::{stream, sync::mpsc::UnboundedReceiver};

[derive(Debug)]

enum MyEnum { A(i32), B(u8), C(&'static str), }

let i32values = HashSet::fromiter(vec![123, 811]); let u8values = HashSet::fromiter(vec![88]); let strvalues = HashSet::fromiter(vec!["Hello", "ABC"]);

let result: UnboundedReceiver = mux!(MyEnum { A, B, C })( stream::iter(i32values.clone()), stream::iter(u8values.clone()), stream::iter(strvalues.clone()), errorhandler::panicking(), );

let (i32results, u8results, strresults) = result .fold( (HashSet::new(), HashSet::new(), HashSet::new()), |(mut i32results, mut u8results, mut strresults), update| async move { match update { MyEnum::A(x) => i32results.insert(x), MyEnum::B(x) => u8results.insert(x), MyEnum::C(x) => str_results.insert(x), };

        (i32_results, u8_results, str_results)
    },
)
.await;

asserteq!(i32results, i32values); asserteq!(u8results, u8values); asserteq!(strresults, str_values); ```

Hash sets are used here owing to the obvious absence of order preservation of updates from input streams.

FAQ

Q: Is only Tokio supported now?

A: Yes. I have no plans yet to support other asynchronous runtimes.