paritersync: Parallel Iterator With Sequential Output

Crate like rayon do not offer synchronization mechanism. This crate provides easy mixture of parallelism and synchronization, such as executing tasks in concurrency with synchronization in certain steps.

Consider the case where multiple threads share a cache which can be read only after prior tasks have written to it (e.g., reads of task 4 depends on writes of task 1-4).

Using IntoParallelIteratorSync trait ```rust // in concurrency: task1 write | task2 write | task3 write | task4 write // __________\ // task4 read depends on task 1-4 write ____ // \ // in concurrency: | task2 read | task3 read | task4 read

use paritersync::IntoParallelIteratorSync; use std::sync::{Arc, Mutex}; use std::collections::HashSet;

// there are 100 tasks let tasks = 0..100;

// an in-memory cache for integers let cache: Arc>> = Arc::new(Mutex::new(HashSet::new())); let cache_clone = cache.clone();

// iterate through tasks tasks.intoparitersync(move |tasknumber| {

// writes cache (write the integer in cache), in parallel
cache.lock().unwrap().insert(task_number);
// return the task number to the next iterator
Ok(task_number)

}).intoparitersync(move |tasknumber| { // <- synced to sequential order

// reads
assert!(cache_clone.lock().unwrap().contains(&task_number));
Ok(())

// append a for each to actually run the whole chain }).foreach(|| ()); ```

Sequential Consistency

The output order is guaranteed to be the same as the upstream iterator, but the execution order is not sequential.

Examples

Mix Syncing and Parallelism By Chaining

```rust use paritersync::IntoParallelIteratorSync;

(0..100).intoparitersync(|i| { Ok(i) // <~ async execution }).intoparitersync(|i| { // <- sync order Ok(i) // <~async execution }).intopariter_sync(|i| { // <- sync order Ok(i) // <~async execution }); // <- sync order ```

Use std::iter::IntoIterator interface

```rust use paritersync::IntoParallelIteratorSync;

let mut count = 0;

// for loop for i in (0..100).intoparitersync(|i| Ok(i)) { asserteq!(i, count); count += 1; }

// sum let sum: i32 = (1..=100).intopariter_sync(|i| Ok(i)).sum();

// take and collect let results: Vec = (0..10).intopariter_sync(|i| Ok(i)).take(5).collect();

asserteq!(sum, 5050); asserteq!(results, vec![0, 1, 2, 3, 4]) ```

Closure Captures Variables

Variables captured are cloned to each threads automatically. ```rust use paritersync::IntoParallelIteratorSync; use std::sync::Arc;

// use Arc to save RAM let resourcecaptured = Arc::new(vec![3, 1, 4, 1, 5, 9, 2, 6, 5, 3]); let len = resourcecaptured.len();

let resultiter = (0..len).intoparitersync(move |i| { // resource_captured is moved into the closure // and cloned to worker threads. let readfromresource = resourcecaptured.get(i).unwrap(); Ok(*readfrom_resource) });

// the result is produced in sequential order let collected: Vec = resultiter.collect(); asserteq!(collected, vec![3, 1, 4, 1, 5, 9, 2, 6, 5, 3]) ```

Fast Fail During Exception

The iterator stops once the inner function returns an Err. ```rust use paritersync::IntoParallelIteratorSync; use std::sync::Arc; use log::warn;

/// this function returns Err when it reads 1000 fn errorat1000(n: i32) -> Result

let results: Vec = (0..10000).intoparitersync(move |a| { Ok(a) }).intoparitersync(move |a| { // error at 1000 errorat1000(a) }).intopariter_sync(move |a| { Ok(a) }).collect();

let expected: Vec = (0..1000).collect(); assert_eq!(results, expected) ```

You may choose to skip error

If you do not want to stop on Err, this is a workaround. ```rust use paritersync::IntoParallelIteratorSync; use std::sync::Arc;

let results: Vecpariter_sync(move |n| { // error at 3, but skip if n == 3 { Ok(Err(())) } else { Ok(Ok(n)) } }).collect();

assert_eq!(results, vec![Ok(0), Ok(1), Ok(2), Err(()), Ok(4)]) ```

Implementation Note

Output Buffering

Synchronization Mechanism

Error handling and Dropping