demux-stream

Docs.rs Crates.io

Demultiplexing

Demultiplexing a stream S is semantically equivalent to matching each item with a type T and pushing it into Stream<T>, e.g.: Stream<Coprod!(A, B, ...)>Hlist![Stream<A>, Stream<B>, ...].

Example

```rust // Imports are omitted...

type MyType = Coprod!(bool, i32, String);

let coproducts = stream::once(async { MyType::inject(123) });

let (mut bools, mut i32s, mut string_s) = <_>::demux(coproducts).into();

asserteq!(i32s.next().await, Some(123)); asserteq!(bools.next().await, None); asserteq!(i32s.next().await, None); asserteq!(strings.next().await, None); ```

Multiplexing

Multiplexing is a dual of demultiplexing. Given a heterogenous list of streams H, redirect each stream's variant with a type T into the resulting stream, e.g.: Hlist![Stream<A>, Stream<B>, ...]Stream<Coprod!(A, B, ...)>.

Example

```rust // Imports are omitted...

type MyType = Coprod!(bool, i32, String, ());

let streams = hlist![ stream::once(async { true }), stream::once(async { 123i32 }), stream::once(async { "Hello".to_owned() }) ];

let mut multiplexed = streams.mux();

asserteq!(multiplexed.next().await, Some(MyType::inject(true))); asserteq!(multiplexed.next().await, Some(MyType::inject(123i32))); asserteq!( multiplexed.next().await, Some(MyType::inject("Hello".toowned())) ); assert_eq!(multiplexed.next().await, None); ```

Implementation details

To avoid boilerplate and achieve a generic solution (at least, for streams), I defined (de)multiplexing by induction on types. More precisely (simplified versions):