rxRust
is a Rust
language implementation of ReactiveX
, but it is not suitable for complex combinations of observable
.
For those who use ReactiveX
in other languages such as rxjs(TypeScript)
or rxcpp
, rxRust
is a very difficult library.
Therefore, I created another-rxrust
, thinking that I needed a library that allows observable
to be connected in the same way as other platforms, and that ReactiveX
can be enjoyed in Rust
.
In addition, ReactiveX
may not be the best solution if the purpose is to parallelize heavy processing and speed it up. However, Reactive X
is one answer for complex combinations of non-blocking I/O and error handling.
Clone + Send + Sync
only.move
to emit values as much as possible.Fn() + Send + Sync
only.std::any
.
'static
lifetime.anyhow
.sh
cargo add another-rxrust
or
toml
[dependencies]
another-rxrust = {}
```rust use crate::prelude::*; use std::{thread, time};
fn basic() { // observable creator function fn ob() -> Observable<'static, i32> { Observable::create(|s| { s.next(100); s.next(200); s.complete(); }) }
observables::fromiter(1..10) .observeon(schedulers::newthreadscheduler()) .flatmap(|x| match x { 1 => observables::empty(), 2 => observables::just(x), 3 => ob().map(move |y| (y + x)), 4 => observables::error(RxError::fromerror("some error")), _ => observables::never(), }) .map(|x| format!("{}", x)) .onerrorresumenext(|e| { ob().map(move |x| format!("resume {:?} {}", e.castref::<&str>(), x)) }) .subscribe( |x| { println!("next {}", x); }, |e| { println!("error {:?}", e.any_ref()); }, || { println!("complete"); }, );
thread::sleep(time::Duration::from_millis(500)); } // [console-results] // next 2 // next 103 // next 203 // next resume "some error" 100 // next resume "some error" 200 // complete ```
```rust use crate::prelude::*; use std::{thread, time};
fn main() { let sbj = subjects::Subject::new(); observables::interval( time::Duration::frommillis(100), schedulers::newthreadscheduler(), ) .sample(sbj.observable()) .take(3) .subscribe(printnextfmt!("{}"), printerror!(), print_complete!());
(0..3).foreach(|| { thread::sleep(time::Duration::frommillis(500)); sbj.next(()); }); sbj.complete(); thread::sleep(time::Duration::frommillis(500)); } // [console-results (Depends on execution environment)] // next - 3 // next - 8 // next - 13 // complete ```
```rust use crate::prelude::*; use std::{thread, time};
fn main() { observables::fromiter(0..10) .observeon(schedulers::newthreadscheduler()) .zip(&[ observables::fromiter(10..20).observeon(schedulers::newthreadscheduler()), observables::fromiter(20..30).observeon(schedulers::newthreadscheduler()), ]) .subscribe(printnextfmt!("{:?}"), printerror!(), printcomplete!()); thread::sleep(time::Duration::from_millis(1000)); } // [console-results] // next - [0, 10, 20] // next - [1, 11, 21] // next - [2, 12, 22] // next - [3, 13, 23] // next - [4, 14, 24] // next - [5, 15, 25] // next - [6, 16, 26] // next - [7, 17, 27] // next - [8, 18, 28] // next - [9, 19, 29] // complete ```