Crate like rayon
do not offer synchronization mechanism.
This crate provides easy mixture of parallelism and synchronization,
such as executing tasks in concurrency with synchronization in certain steps.
Consider the case where multiple threads share a cache which can be read only after prior tasks have written to it (e.g., reads of task 4 depends on writes of task 1-4).
Using IntoParallelIteratorSync
trait
```rust
// in concurrency: task1 write | task2 write | task3 write | task4 write
// __________\
// task4 read depends on task 1-4 write ____
// \
// in concurrency: | task2 read | task3 read | task4 read
use paritersync::IntoParallelIteratorSync; use std::sync::{Arc, Mutex}; use std::collections::HashSet;
// there are 100 tasks let tasks = 0..100;
// an in-memory cache for integers
let cache: Arc
// iterate through tasks tasks.intoparitersync(move |tasknumber| {
// writes cache (write the integer in cache), in parallel
cache.lock().unwrap().insert(task_number);
// return the task number to the next iterator
Ok(task_number)
}).intoparitersync(move |tasknumber| { // <- synced to sequential order
// reads
assert!(cache_clone.lock().unwrap().contains(&task_number));
Ok(())
// append a for each to actually run the whole chain }).foreach(|| ()); ```
This crate is designed to clone all resources captured by the closure
for each thread. To prevent unintended RAM usage, you may wrap
large data structure using Arc
(especially vectors of Clone
objects).
The output order is guaranteed to be the same as the upstream iterator, but the execution order is not sequential.
Platform: Macbook Air (2015) 8 GB RAM, Intel Core i5, 1.6GHZ (2 Core).
One million (1,000,000) empty iteration for each run.
test iter_async::test_par_iter_async::bench_into_par_iter_async ... bench: 125,574,305 ns/iter (+/- 73,066,288)
test test_par_iter::bench_into_par_iter_sync ... bench: 339,214,244 ns/iter (+/- 220,914,336)
Result:
- Async iterator overhead 125,574,305 / 1,000,000 = 125 ns (+/- 73 ns)
.
- Sync iterator overhead 125,574,305 / 1,000,000 = 339 ns (+/- 220 ns)
.
rust
#[bench]
fn bench_into_par_iter_async(b: &mut Bencher) {
b.iter(|| {
(0..1_000_000).into_par_iter_async(|a| Ok(a)).for_each(|_|{})
});
}
rust
#[bench]
fn bench_into_par_iter_sync(b: &mut Bencher) {
b.iter(|| {
(0..1_000_000).into_par_iter_sync(|a| Ok(a)).for_each(|_|{})
});
}
```rust use paritersync::IntoParallelIteratorSync;
(0..100).intoparitersync(|i| { Ok(i) // <~ async execution }).intoparitersync(|i| { // <- sync order Ok(i) // <~async execution }).intopariter_sync(|i| { // <- sync order Ok(i) // <~async execution }); // <- sync order ```
std::iter::IntoIterator
interface```rust use paritersync::IntoParallelIteratorSync;
let mut count = 0;
// for loop for i in (0..100).intoparitersync(|i| Ok(i)) { asserteq!(i, count); count += 1; }
// sum let sum: i32 = (1..=100).intopariter_sync(|i| Ok(i)).sum();
// take and collect
let results: Vec
asserteq!(sum, 5050); asserteq!(results, vec![0, 1, 2, 3, 4]) ```
Variables captured are cloned to each threads automatically. ```rust use paritersync::IntoParallelIteratorSync; use std::sync::Arc;
// use Arc
to save RAM
let resourcecaptured = Arc::new(vec![3, 1, 4, 1, 5, 9, 2, 6, 5, 3]);
let len = resourcecaptured.len();
let resultiter = (0..len).intoparitersync(move |i| {
// resource_captured
is moved into the closure
// and cloned to worker threads.
let readfromresource = resourcecaptured.get(i).unwrap();
Ok(*readfrom_resource)
});
// the result is produced in sequential order
let collected: Vec
The iterator stops once the inner function returns an Err
.
```rust
use paritersync::IntoParallelIteratorSync;
use std::sync::Arc;
use log::warn;
/// this function returns let results: Vec let expected: Vec If you do not want to stop on let results: Vec assert_eq!(results, vec![Ok(0), Ok(1), Ok(2), Err(()), Ok(4)])
```Err
when it reads 1000
fn errorat1000(n: i32) -> ResultYou may choose to skip error
Err
, this is a workaround.
```rust
use paritersync::IntoParallelIteratorSync;
use std::sync::Arc;Implementation Note
Output Buffering
Synchronization Mechanism
thread_number
)
and the task ID (task_number
) into a mpsc channel.next()
is called, the consumer fetch from the task registry
(task_order
) the next thread ID and task ID.
It then receives from the channel of that thread, and checks whether
the current task (current
) matches the task ID to ensure that no thread
has run into exception.next()
detect that some thread has not produced result due to exception,
it calls kill()
, which stop threads from fetching new tasks,
flush remaining tasks, and joining the worker threads.Error handling and Dropping