paritersync: Parallel Iterator With Sequential Output

rust test

Crate like rayon do not offer synchronization mechanism. This crate provides easy mixture of parallelism and synchronization.

Consider the case where multiple threads share a cache which can be read only after prior tasks have written to it (e.g., reads of task 4 depends on writes of task 1-4).

Using IntoParallelIteratorSync trait ```rust // in concurrency: task1 write | task2 write | task3 write | task4 write // __________\ // task4 read depends on task 1-4 write ____ // \ // in concurrency: | task2 read | task3 read | task4 read

use paritersync::IntoParallelIteratorSync; use std::sync::{Arc, Mutex}; use std::collections::HashSet;

// there are 100 tasks let tasks = 0..100;

// an in-memory cache for integers let cache: Arc>> = Arc::new(Mutex::new(HashSet::new())); let cache_clone = cache.clone();

// iterate through tasks tasks.intoparitersync(move |tasknumber| {

// writes cache (write the integer in cache), in parallel
cache.lock().unwrap().insert(task_number);
// return the task number to the next iterator
Ok(task_number)

}).intoparitersync(move |tasknumber| { // <- synced to sequential order

// reads
assert!(cache_clone.lock().unwrap().contains(&task_number));
Ok(())

// append a for each to actually run the whole chain }).foreach(|| ()); ```

Usage Caveat

This crate is designed to clone all resources captured by the closure for each thread. To prevent unintended RAM usage, you may wrap large data structure using Arc (especially vectors of Clone objects).

Sequential Consistency

The output order is guaranteed to be the same as the upstream iterator, but the execution order is not sequential.

Overhead Benchmark

Platform: Macbook Air (2015 Late) 8 GB RAM, Intel Core i5, 1.6GHZ (2 Core).

Result

One million (1,000,000) empty iteration for each run. test iter_async::test_par_iter_async::bench_into_par_iter_async ... bench: 125,574,305 ns/iter (+/- 73,066,288) test test_par_iter::bench_into_par_iter_sync ... bench: 339,214,244 ns/iter (+/- 220,914,336)

Result: - Async iterator overhead 125,574,305 / 1,000,000 = 125 ns (+/- 73 ns). - Sync iterator overhead 125,574,305 / 1,000,000 = 339 ns (+/- 220 ns).

Bench Programs

iter_async

rust #[bench] fn bench_into_par_iter_async(b: &mut Bencher) { b.iter(|| { (0..1_000_000).into_par_iter_async(|a| Ok(a)).for_each(|_|{}) }); }

iter_sync

rust #[bench] fn bench_into_par_iter_sync(b: &mut Bencher) { b.iter(|| { (0..1_000_000).into_par_iter_sync(|a| Ok(a)).for_each(|_|{}) }); }

Examples

Mix Syncing and Parallelism By Chaining

```rust use paritersync::IntoParallelIteratorSync;

(0..100).intoparitersync(|i| { Ok(i) // <~ async execution }).intoparitersync(|i| { // <- sync order Ok(i) // <~async execution }).intopariter_sync(|i| { // <- sync order Ok(i) // <~async execution }); // <- sync order ```

Use std::iter::IntoIterator interface

```rust use paritersync::IntoParallelIteratorSync;

let mut count = 0;

// for loop for i in (0..100).intoparitersync(|i| Ok(i)) { asserteq!(i, count); count += 1; }

// sum let sum: i32 = (1..=100).intopariter_sync(|i| Ok(i)).sum();

// take and collect let results: Vec = (0..10).intopariter_sync(|i| Ok(i)).take(5).collect();

asserteq!(sum, 5050); asserteq!(results, vec![0, 1, 2, 3, 4]) ```

Closure Captures Variables

Variables captured are cloned to each thread automatically. ```rust use paritersync::IntoParallelIteratorSync; use std::sync::Arc;

// use Arc to save RAM let resourcecaptured = Arc::new(vec![3, 1, 4, 1, 5, 9, 2, 6, 5, 3]); let len = resourcecaptured.len();

let resultiter = (0..len).intoparitersync(move |i| { // resource_captured is moved into the closure // and cloned to worker threads. let readfromresource = resourcecaptured.get(i).unwrap(); Ok(*readfrom_resource) });

// the result is produced in sequential order let collected: Vec = resultiter.collect(); asserteq!(collected, vec![3, 1, 4, 1, 5, 9, 2, 6, 5, 3]) ```

Fast Fail During Exception

The iterator stops once the inner function returns an Err. ```rust use paritersync::IntoParallelIteratorSync; use std::sync::Arc; use log::warn;

/// this function returns Err when it reads 1000 fn errorat1000(n: i32) -> Result

let results: Vec = (0..10000).intoparitersync(move |a| { Ok(a) }).intoparitersync(move |a| { // error at 1000 errorat1000(a) }).intopariter_sync(move |a| { Ok(a) }).collect();

let expected: Vec = (0..1000).collect(); assert_eq!(results, expected) ```

You may choose to skip error

If you do not want to stop on Err, this is a workaround. ```rust use paritersync::IntoParallelIteratorSync; use std::sync::Arc;

let results: Vecpariter_sync(move |n| { // error at 3, but skip if n == 3 { Ok(Err(())) } else { Ok(Ok(n)) } }).collect();

assert_eq!(results, vec![Ok(0), Ok(1), Ok(2), Err(()), Ok(4)]) ```

Implementation Note

Output Buffering

Synchronization Mechanism

Error handling and Dropping