Demultiplexing a stream S
is semantically equivalent to matching each item with a type T
and pushing it into Stream<T>
, e.g.: Stream<Coprod!(A, B, ...)>
→ Hlist![Stream<A>, Stream<B>, ...]
.
```rust // Imports are omitted...
type MyType = Coprod!(bool, i32, String);
let coproducts = stream::once(async { MyType::inject(123) });
let (mut bools, mut i32s, mut string_s) = <_>::demux(coproducts).into();
asserteq!(i32s.next().await, Some(123)); asserteq!(bools.next().await, None); asserteq!(i32s.next().await, None); asserteq!(strings.next().await, None); ```
Multiplexing is a dual of demultiplexing. Given a heterogenous list of streams H
, redirect each stream's variant with a type T
into the resulting stream, e.g.: Hlist![Stream<A>, Stream<B>, ...]
→ Stream<Coprod!(A, B, ...)>
.
```rust // Imports are omitted...
type MyType = Coprod!(bool, i32, String, ());
let streams = hlist![ stream::once(async { true }), stream::once(async { 123i32 }), stream::once(async { "Hello".to_owned() }) ];
let mut multiplexed = streams.mux();
asserteq!(multiplexed.next().await, Some(MyType::inject(true))); asserteq!(multiplexed.next().await, Some(MyType::inject(123i32))); asserteq!( multiplexed.next().await, Some(MyType::inject("Hello".toowned())) ); assert_eq!(multiplexed.next().await, None); ```
To avoid boilerplate and achieve a generic solution (at least, for streams), I defined (de)multiplexing by induction on types. More precisely (simplified versions):
impl Demux for CNil { type Output = HNil; ... }
.impl<L, R: Demux> Demux for Coproduct<L, R> { type Output = HCons<UnboundedReceiver<L>, R::Output>; ... }
.impl Mux for HNil { type Output = Coprod![()]; ... }
.impl<Head, Tail: Mux> Mux for HCons<Head, Tail> { type Output = Coproduct<Head::Item, Tail::Output>; ...}
.