another-rxrust

why new implementation?

rxRust is a Rust language implementation of ReactiveX, but it is not suitable for complex combinations of observable. For those who use ReactiveX in other languages such as rxjs(TypeScript) or rxcpp, rxRust is a very difficult library.

Therefore, I created another-rxrust, thinking that I needed a library that allows observable to be connected in the same way as other platforms, and that ReactiveX can be enjoyed in Rust.

In addition, ReactiveX may not be the best solution if the purpose is to parallelize heavy processing and speed it up. However, Reactive X is one answer for complex combinations of non-blocking I/O and error handling.

Implementation policy

Implementation status

See implementation status.

Usage

default

toml [dependencies] another-rxrust = {}

use anyhow::Error

toml [dependencies] another-rxrust = {features=["anyhow"]}

Samples

create error instance

std::any

rust RxError::new(Box::new("any error".to_owned()))

anyhow

rust RxError::new(anyhow::anyhow!("anyhow error"))

basic

```rust use crate::prelude::*; use std::{thread, time};

[test]

fn basic() { // observable creator function fn ob() -> Observable<'static, i32> { Observable::create(|s| { s.next(100); s.next(200); s.complete(); }) }

observables::fromiter(1..10) .observeon(schedulers::newthreadscheduler()) .flatmap(|x| match x { 1 => observables::empty(), 2 => observables::just(x), 3 => ob().map(move |y| (y + x)), 4 => observables::error(RxError::new(anyhow::anyhow!("anyhow error"))), _ => observables::never(), }) .map(|x| format!("{}", x)) .onerrorresumenext(|e| ob().map(move |x| format!("resume {:} {}", errortostring(&e), x))) .subscribe( |x| { println!("next {}", x); }, |e| { println!("error {:?}", e.error); }, || { println!("complete"); }, );

thread::sleep(time::Duration::from_millis(500)); } // [console-results] // next 2 // next 103 // next 203 // next resume any error 100 // next resume any error 200 // complete ```

sample operator & subject

```rust use crate::prelude::*; use std::{thread, time};

fn main() { let sbj = subjects::Subject::new(); observables::interval( time::Duration::frommillis(100), schedulers::newthreadscheduler(), ) .sample(sbj.observable()) .take(3) .subscribe(printnextfmt!("{}"), printerror!(), print_complete!());

(0..3).foreach(|| { thread::sleep(time::Duration::frommillis(500)); sbj.next(()); }); sbj.complete(); thread::sleep(time::Duration::frommillis(500)); } // [console-results (Depends on execution environment)] // next - 3 // next - 8 // next - 13 // complete ```

zip

```rust use crate::prelude::*; use std::{thread, time};

fn main() { observables::fromiter(0..10) .observeon(schedulers::newthreadscheduler()) .zip(&[ observables::fromiter(10..20).observeon(schedulers::newthreadscheduler()), observables::fromiter(20..30).observeon(schedulers::newthreadscheduler()), ]) .subscribe(printnextfmt!("{:?}"), printerror!(), printcomplete!()); thread::sleep(time::Duration::from_millis(1000)); } // [console-results] // next - [10, 20, 0] // next - [11, 21, 1] // next - [12, 22, 2] // next - [13, 23, 3] // next - [14, 24, 4] // next - [15, 25, 5] // next - [16, 26, 6] // next - [17, 27, 7] // next - [18, 28, 8] // next - [19, 29, 9] // complete ```