rxRust
?rxRust
is for streaming single data, not for complex streaming of ReactiveX
. Also, there is no error recovery, and it is different from general ReactiveX
.
Therefore, something that can be written easily with another-rxrust
as shown below becomes extremely difficult with rxRust
.
```rust use crate::prelude::*; use anyhow::anyhow; use std::{thread, time};
fn basic() { fn ob() -> Observable<'static, i32> { Observable::create(|s| { s.next(1); s.next(2); s.next(3); s.next(4); s.complete(); }) }
ob() .observeon(schedulers::newthreadscheduler()) .flatmap(|x| match x { 1 => observables::empty(), 2 => observables::just(x), 3 => ob().map(|x| (x + 100)), 4 => observables::error(RxError::new(anyhow!("err"))), _ => observables::never(), }) .map(|x| format!("{}", x)) .onerrorresume_next(|e| ob().map(move |x| format!("resume {:} {}", e.error, x))) .subscribe( |x| { println!("next {}", x); }, |e| { println!("error {:}", e.error); }, || { println!("complete"); }, );
thread::sleep(time::Duration::from_millis(500)); }
// next 2 // next 101 // next 102 // next 103 // next 104 // next resume err 1 // next resume err 2 // next resume err 3 // next resume err 4 // complete ```
Based on the problems of rxRust
, another-rxrust
has the following implementation policy.
Clone + Send + Sync
can be issued.Fn() + Send + Sync
only.anyhow::Error
.