rxRust
is a Rust
language implementation of ReactiveX
, but it is not suitable for complex combinations of observable
.
For those who use ReactiveX
in other languages such as rxjs(TypeScript)
or rxcpp
, rxRust
is a very difficult library.
Therefore, I created another-rxrust
, thinking that I needed a library that allows observable
to be connected in the same way as other platforms, and that ReactiveX
can be enjoyed in Rust
.
In addition, ReactiveX
may not be the best solution if the purpose is to parallelize heavy processing and speed it up. However, Reactive X
is one answer for complex combinations of non-blocking I/O and error handling.
Clone + Send + Sync
only.move
to emit values as much as possible.Fn() + Send + Sync
only.std::any
. If features=["anyhow"]
use anyhow::Error
.toml
[dependencies]
another-rxrust = {}
anyhow::Error
toml
[dependencies]
another-rxrust = {features=["anyhow"]}
rust
RxError::new(Box::new("any error".to_owned()))
rust
RxError::new(anyhow::anyhow!("anyhow error"))
```rust use crate::prelude::*; use std::{thread, time};
fn basic() { // observable creator function fn ob() -> Observable<'static, i32> { Observable::create(|s| { s.next(100); s.next(200); s.complete(); }) }
observables::fromiter(1..10) .observeon(schedulers::newthreadscheduler()) .flatmap(|x| match x { 1 => observables::empty(), 2 => observables::just(x), 3 => ob().map(move |y| (y + x)), 4 => observables::error(RxError::new(anyhow::anyhow!("anyhow error"))), _ => observables::never(), }) .map(|x| format!("{}", x)) .onerrorresumenext(|e| ob().map(move |x| format!("resume {:} {}", errortostring(&e), x))) .subscribe( |x| { println!("next {}", x); }, |e| { println!("error {:?}", e.error); }, || { println!("complete"); }, );
thread::sleep(time::Duration::from_millis(500)); } // [console-results] // next 2 // next 103 // next 203 // next resume any error 100 // next resume any error 200 // complete ```
```rust use crate::prelude::*; use std::{thread, time};
fn main() { let sbj = subjects::Subject::new(); observables::interval( time::Duration::frommillis(100), schedulers::newthreadscheduler(), ) .sample(sbj.observable()) .take(3) .subscribe(printnextfmt!("{}"), printerror!(), print_complete!());
(0..3).foreach(|| { thread::sleep(time::Duration::frommillis(500)); sbj.next(()); }); sbj.complete(); thread::sleep(time::Duration::frommillis(500)); } // [console-results (Depends on execution environment)] // next - 3 // next - 8 // next - 13 // complete ```
```rust use crate::prelude::*; use std::{thread, time};
fn main() { observables::fromiter(0..10) .observeon(schedulers::newthreadscheduler()) .zip(&[ observables::fromiter(10..20).observeon(schedulers::newthreadscheduler()), observables::fromiter(20..30).observeon(schedulers::newthreadscheduler()), ]) .subscribe(printnextfmt!("{:?}"), printerror!(), printcomplete!()); thread::sleep(time::Duration::from_millis(1000)); } // [console-results] // next - [10, 20, 0] // next - [11, 21, 1] // next - [12, 22, 2] // next - [13, 23, 3] // next - [14, 24, 4] // next - [15, 25, 5] // next - [16, 26, 6] // next - [17, 27, 7] // next - [18, 28, 8] // next - [19, 29, 9] // complete ```