RxRs - Reactive Extensions for Rust
```rust (Rust Nightly 1.25+)
fn timer() { println!("cur thread {:?}", thread::current().id());
rx::timer(0, Some(10), NewThreadScheduler::get())
.skip(3)
.filter(|i| i % 2 == 0)
.take(3)
.map(|v| format!("-{}-", v))
.observe_on(NewThreadScheduler::get())
.subf(
|v| println!("{} on {:?}", v, thread::current().id()),
(),
| | println!("complete on {:?}", thread::current().id())
);
thread::sleep(::std::time::Duration::from_millis(2000));
}
Output:
bash
cur thread ThreadId(1)
-4- on ThreadId(2)
-6- on ThreadId(2)
-8- on ThreadId(2)
complete on ThreadId(2)
```
```rust //(lib for this demo is also a WIP)
let slider = Scale::newwithrange(Orientation::Horizontal, 0.0, 100.0, 1.0);
// create an Observable
from the slider's value_changed
event
event!(slider.connectvaluechanged, it => it.getvalue())
.startwith(0.0) // events (signals) don't emit an initial value, so we give it one
.debounce(250, GtkScheduler::get()) // debounce with 250ms to limit input frequency
.observeon(NewThreadScheduler::get()) // change to a worker thread
.map(|v| format!("*{}*", v*v)) // do hard (or blocking) jobs on that thread
.observeon(GtkScheduler::get()) // schedule results back to main thread ...
.subf( // ... for displaying
byclone!(btn => move |v:String| btn.set_label(&v) )
);
```
``` src ├── behavioursubject.rs ├── connectableobservable.rs ├── fac │  ├── create.rs │  ├── mod.rs │  └── timer.rs ├── lib.rs ├── observable.rs ├── op │  ├── concat.rs │  ├── debounce.rs │  ├── filter.rs │  ├── map.rs │  ├── mod.rs │  ├── multicast.rs │  ├── observeon.rs │  ├── publish.rs │  ├── skip.rs │  ├── subon.rs │  ├── take.rs │  ├── takeuntil.rs │  └── tap.rs ├── scheduler.rs ├── subject.rs ├── subscriber.rs ├── unsubref.rs └── util ├── arccell.rs ├── atomicoption.rs └── mod.rs
```
Scheduler
sScheduler
s