rxRust
?rxRust
is for streaming single data, not for complex streaming of ReactiveX
. Also, there is no error recovery, and it is different from general ReactiveX
.
Therefore, something that can be written easily with another-rxrust
as shown below becomes extremely difficult with rxRust
.
```rust use crate::prelude::*; use anyhow::anyhow; use std::{thread, time};
fn main() { fn ob() -> Observable<'static, i32> { Observable::create(|s| { s.next(100); s.next(200); s.complete(); }) }
observables::fromiter(vec![1, 2, 3, 4, 5].intoiter()) .observeon(schedulers::newthreadscheduler()) .flatmap(|x| match x { 1 => observables::empty(), 2 => observables::just(x), 3 => ob().map(move |y| (y + x)), 4 => observables::error(RxError::new(anyhow!("err"))), _ => observables::never(), }) .map(|x| format!("{}", x)) .onerrorresume_next(|e| ob().map(move |x| format!("resume {:} {}", e.error, x))) .subscribe( |x| { println!("next {}", x); }, |e| { println!("error {:}", e.error); }, || { println!("complete"); }, );
thread::sleep(time::Duration::from_millis(600)); }
// next 2 // next 103 // next 203 // next resume err 100 // next resume err 200 // complete ```
Based on the problems of rxRust
, another-rxrust
has the following implementation policy.
Clone + Send + Sync
only.move
to emit values as much as possible.Fn() + Send + Sync
only.std::any
. If features=["anyhow"]
use anyhow::Error
.