another-rxrust
Why not rxRust
?
I think rxRust
is a great Rust
implementation of ReactiveX
. However, when rxRust
combines observable
in a slightly complicated way, rust
peculiar difficulties are exposed.
Therefore, I implemented ReactiveX
in a different way than rxRust
, and created another-rxrust
that can be easily described even if the following observable
is combined in a complicated manner.
This implementation is not a panacea. Rust
sacrifices memory efficiency, speed, and much more.
If you want performance, I think you should use rxRust
.
```rust
use crate::prelude::*;
use anyhow::anyhow;
use std::{thread, time};
fn main() {
fn ob() -> Observable<'static, i32> {
Observable::create(|s| {
s.next(100);
s.next(200);
s.complete();
})
}
observables::fromiter(1..10)
.observeon(schedulers::newthreadscheduler())
.flatmap(|x| match x {
1 => observables::empty(),
2 => observables::just(x),
3 => ob().map(move |y| (y + x)),
4 => observables::error(RxError::new(anyhow!("err"))),
_ => observables::never(),
})
.map(|x| format!("{}", x))
.onerrorresumenext(|e| ob().map(move |x| format!("resume {:} {}", e.error, x)))
.subscribe(
|x| {
println!("next {}", x);
},
|e| {
println!("error {:}", e.error);
},
|| {
println!("complete");
},
);
thread::sleep(time::Duration::from_millis(600));
}
// next 2
// next 103
// next 203
// next resume err 100
// next resume err 200
// complete
```
Implementation policy
Based on the problems of rxRust
, another-rxrust
has the following implementation policy.
- It is assumed that the values and functions that can be emitted may be shared between threads.
- Value to emit should be
Clone + Send + Sync
only.
- Use
move
to emit values as much as possible.
- Functions should be
Fn() + Send + Sync
only.
- Default errors use
std::any
. If features=["anyhow"]
use anyhow::Error
.
- Prioritize flexibility over memory efficiency and execution speed.
Usage
default
toml
[dependencies]
another-rxrust = {}
use anyhow::Error
toml
[dependencies]
another-rxrust = {features=["anyhow"]}
Implementation status
Quoted from ReactiveX.
Creating Observables
Operators that originate new Observables.
- [x] Create — create an Observable from scratch by calling observer methods programmatically
- [ ] Defer — do not create the Observable until the observer subscribes, and create a fresh Observable for each observer
- [x] Empty/Never/Throw — create Observables that have very precise and limited behavior
- [x] From — convert some other object or data structure into an Observable
- [x] Interval — create an Observable that emits a sequence of integers spaced by a particular time interval
- [x] Just — convert an object or a set of objects into an Observable that emits that or those objects
- [ ] Range — create an Observable that emits a range of sequential integers
- [ ] Repeat — create an Observable that emits a particular item or sequence of items repeatedly
- [ ] Start — create an Observable that emits the return value of a function
- [x] Timer — create an Observable that emits a single item after a given delay
Transforming Observables
Operators that transform items that are emitted by an Observable.
- [ ] Buffer — periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time
- [x] FlatMap — transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
- [ ] GroupBy — divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key
- [x] Map — transform the items emitted by an Observable by applying a function to each item
- [ ] Scan — apply a function to each item emitted by an Observable, sequentially, and emit each successive value
- [ ] Window — periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time
Filtering Observables
Operators that selectively emit items from a source Observable.
- [ ] Debounce — only emit an item from an Observable if a particular timespan has passed without it emitting another item
- [x] Distinct — suppress duplicate items emitted by an Observable
- [ ] ElementAt — emit only item n emitted by an Observable
- [ ] Filter — emit only those items from an Observable that pass a predicate test
- [x] First — emit only the first item, or the first item that meets a condition, from an Observable
- [ ] IgnoreElements — do not emit any items from an Observable but mirror its termination notification
- [x] Last — emit only the last item emitted by an Observable
- [ ] Sample — emit the most recent item emitted by an Observable within periodic time intervals
- [x] Skip — suppress the first n items emitted by an Observable
- [x] SkipLast — suppress the last n items emitted by an Observable
- [x] Take — emit only the first n items emitted by an Observable
- [x] TakeLast — emit only the last n items emitted by an Observable
Combining Observables
Operators that work with multiple source Observables to create a single Observable
- [ ] And/Then/When — combine sets of items emitted by two or more Observables by means of Pattern and Plan intermediaries
- [ ] CombineLatest — when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function
- [ ] Join — combine items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable
- [x] Merge — combine multiple Observables into one by merging their emissions
- [ ] StartWith — emit a specified sequence of items before beginning to emit the items from the source Observable
- [ ] Switch — convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables
- [ ] Zip — combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function
Error Handling Operators
Operators that help to recover from error notifications from an Observable
- [x] Catch — recover from an onError notification by continuing the sequence without error
- [x] Retry — if a source Observable sends an onError notification, resubscribe to it in the hopes that it will complete without error
Observable Utility Operators
A toolbox of useful Operators for working with Observables
- [ ] Delay — shift the emissions from an Observable forward in time by a particular amount
- [ ] Do — register an action to take upon a variety of Observable lifecycle events
- [ ] Materialize/Dematerialize — represent both the items emitted and the notifications sent as emitted items, or reverse this process
- [x] ObserveOn — specify the scheduler on which an observer will observe this Observable
- [ ] Serialize — force an Observable to make serialized calls and to be well-behaved
- [ ] Subscribe — operate upon the emissions and notifications from an Observable
- [x] SubscribeOn — specify the scheduler an Observable should use when it is subscribed to
- [ ] TimeInterval — convert an Observable that emits items into one that emits indications of the amount of time elapsed between those emissions
- [ ] Timeout — mirror the source Observable, but issue an error notification if a particular period of time elapses without any emitted items
- [ ] Timestamp — attach a timestamp to each item emitted by an Observable
Using — create a disposable resource that has the same lifespan as the Observable
Conditional and Boolean Operators
Operators that evaluate one or more Observables or items emitted by Observables
- [ ] All — determine whether all items emitted by an Observable meet some criteria
- [ ] Amb — given two or more source Observables, emit all of the items from only the first of these Observables to emit an item
- [ ] Contains — determine whether an Observable emits a particular item or not
- [ ] DefaultIfEmpty — emit items from the source Observable, or a default item if the source Observable emits nothing
- [ ] SequenceEqual — determine whether two Observables emit the same sequence of items
- [x] SkipUntil — discard items emitted by an Observable until a second Observable emits an item
- [x] SkipWhile — discard items emitted by an Observable until a specified condition becomes false
- [x] TakeUntil — discard items emitted by an Observable after a second Observable emits an item or terminates
- [x] TakeWhile — discard items emitted by an Observable after a specified condition becomes false
Mathematical and Aggregate Operators
Operators that operate on the entire sequence of items emitted by an Observable
- [ ] Average — calculates the average of numbers emitted by an Observable and emits this average
- [ ] Concat — emit the emissions from two or more Observables without interleaving them
- [ ] Count — count the number of items emitted by the source Observable and emit only this value
- [ ] Max — determine, and emit, the maximum-valued item emitted by an Observable
- [ ] Min — determine, and emit, the minimum-valued item emitted by an Observable
- [ ] Reduce — apply a function to each item emitted by an Observable, sequentially, and emit the final value
- [ ] Sum — calculate the sum of numbers emitted by an Observable and emit this sum
Connectable Observable Operators
Specialty Observables that have more precisely-controlled subscription dynamics
- [ ] Connect — instruct a connectable Observable to begin emitting items to its subscribers
- [ ] Publish — convert an ordinary Observable into a connectable Observable
- [ ] RefCount — make a Connectable Observable behave like an ordinary Observable
- [ ] Replay — ensure that all observers see the same sequence of emitted items, even if they subscribe after the Observable has begun emitting items
Subjects
- [ ] AsyncSubject
- [x] BehaviorSubject
- [x] PublishSubject
- [ ] ReplaySubject
Schedulers
- [x] Default - A scheduler to run on the current thread.
- [x] NewThread - A scheduler that creates a new thread and executes it there.
Others
- [x] pipe - for custom operators.