NonBlockingMutex

github crates.io docs.rs Build and test Rust

Why you should use NonBlockingMutex

NonBlockingMutex is currently the fastest way to do expensive calculations under lock, or do cheap calculations under lock when concurrency/load/contention is very high - see benchmarks in directory benches and run them with bash cargo bench

Installation

bash cargo add non_blocking_mutex

Examples

Optimized for 1 type of NonBlockingMutexTask

```rust use nonblockingmutex::mutexguard::MutexGuard; use nonblockingmutex::nonblockingmutex::NonBlockingMutex; use std::thread::{availableparallelism};

/// How many threads can physically access [NonBlockingMutex] /// simultaneously, needed for computing shard_count of [ShardedQueue], /// used to store queue of tasks let maxconcurrentthreadcount = availableparallelism().unwrap().get();

let nonblockingmutex = NonBlockingMutex::new(maxconcurrentthreadcount, 0); /// Will infer exact type and size(0) of this [FnOnce] and /// make sized [NonBlockingMutex] which takes only this exact [FnOnce] /// without ever requiring [Box]-ing or dynamic dispatch nonblockingmutex.runiffirstorscheduleon_first(|mut state: MutexGuard| { *state += 1; }); ```

Easy to use with any function, but may Box tasks and use dynamic dispatch

```rust use nonblockingmutex::dynamicnonblockingmutex::DynamicNonBlockingMutex; use std::thread::{availableparallelism, scope};

/// How many threads can physically access [NonBlockingMutex] /// simultaneously, needed for computing shard_count of [ShardedQueue], /// used to store queue of tasks let maxconcurrentthreadcount = availableparallelism().unwrap().get();

let mut statesnapshotbeforeincrement = 0; let mut statesnapshotafterincrement = 0;

let mut statesnapshotbeforedecrement = 0; let mut statesnapshotafterdecrement = 0;

{ /// Will infer exact type and size of struct [Task] and /// make sized [NonBlockingMutex] which takes only [Task] /// without ever requiring [Box]-ing or dynamic dispatch let nonblockingmutex = DynamicNonBlockingMutex::new(maxconcurrentthread_count, 0);

scope(|scope| {
    scope.spawn(|| {
        non_blocking_mutex.run_fn_once_if_first_or_schedule_on_first(|mut state| {
            *(&mut state_snapshot_before_increment) = *state;
            *state += 1;
            *(&mut state_snapshot_after_increment) = *state;
        });
        non_blocking_mutex.run_fn_once_if_first_or_schedule_on_first(|mut state| {
            *(&mut state_snapshot_before_decrement) = *state;
            *state -= 1;
            *(&mut state_snapshot_after_decrement) = *state;
        });
    });
});

}

asserteq!(statesnapshotbeforeincrement, 0); asserteq!(statesnapshotafterincrement, 1);

asserteq!(statesnapshotbeforedecrement, 1); asserteq!(statesnapshotafterdecrement, 0); ```

Optimized for multiple known types of [NonBlockingMutexTask] which capture variables

```rust use nonblockingmutex::mutexguard::MutexGuard; use nonblockingmutex::nonblockingmutex::NonBlockingMutex; use nonblockingmutex::nonblockingmutextask::NonBlockingMutexTask; use std::thread::{available_parallelism, scope};

/// How many threads can physically access [NonBlockingMutex] /// simultaneously, needed for computing shard_count of [ShardedQueue], /// used to store queue of tasks let maxconcurrentthreadcount = availableparallelism().unwrap().get();

struct SnapshotsBeforeAndAfterChangeRefs< 'snapshotbeforechangeref, 'snapshotafterchangeref,

{ /// Where to write snapshot of State before applying function to State snapshotbeforechangeref: &'snapshotbeforechangeref mut usize, /// Where to write snapshot of State after applying function to `State snapshotafterchangeref: &'snapshotafterchangeref mut usize, }

enum TaskType<'snapshotbeforechangeref, 'snapshotafterchangeref> { IncrementAndStoreSnapshots( SnapshotsBeforeAndAfterChangeRefs< 'snapshotbeforechangeref, 'snapshotafterchangeref,

, ), DecrementAndStoreSnapshots( SnapshotsBeforeAndAfterChangeRefs< 'snapshotbeforechangeref, 'snapshotafterchangeref, , ), }

struct Task<'snapshotbeforechangeref, 'snapshotafterchangeref> { tasktype: TaskType<'snapshotbeforechangeref, 'snapshotafterchange_ref>, }

impl<'snapshotbeforechangeref, 'snapshotafterchangeref> Task<'snapshotbeforechangeref, 'snapshotafterchangeref> { fn newincrementandstoresnapshots( // Where to write snapshot of State before applying function to State snapshotbeforechangeref: &'snapshotbeforechangeref mut usize, // Where to write snapshot of State after applying function to State snapshot_after_change_ref: &'snapshot_after_change_ref mut usize, ) -> Self { Self { task_type: TaskType::IncrementAndStoreSnapshots( SnapshotsBeforeAndAfterChangeRefs { /// Where to write snapshot ofStatebefore applying function toState snapshot_before_change_ref, /// Where to write snapshot ofStateafter applying function toState snapshotafterchange_ref, }, ), } }

fn new_decrement_and_store_snapshots(
    // Where to write snapshot of `State` before applying function to `State`
    snapshot_before_change_ref: &'snapshot_before_change_ref mut usize,
    // Where to write snapshot of `State` after applying function to `State
    snapshot_after_change_ref: &'snapshot_after_change_ref mut usize,
) -> Self {
    Self {
        task_type: TaskType::DecrementAndStoreSnapshots(
            SnapshotsBeforeAndAfterChangeRefs {
                /// Where to write snapshot of `State` before applying function to `State`
                snapshot_before_change_ref,
                /// Where to write snapshot of `State` after applying function to `State
                snapshot_after_change_ref,
            },
        ),
    }
}

}

impl<'snapshotbeforechangeref, 'snapshotafterchangeref> NonBlockingMutexTask for Task<'snapshotbeforechangeref, 'snapshotafterchangeref> { fn runwithstate(self, mut state: MutexGuard) { match self.tasktype { TaskType::IncrementAndStoreSnapshots(SnapshotsBeforeAndAfterChangeRefs { snapshotbeforechangeref, snapshotafterchangeref, }) => { *snapshotbeforechangeref = *state; *state += 1; *snapshotafterchangeref = *state; } TaskType::DecrementAndStoreSnapshots(SnapshotsBeforeAndAfterChangeRefs { snapshotbeforechangeref, snapshotafterchangeref, }) => { *snapshotbeforechangeref = *state; *state -= 1; *snapshotafterchange_ref = *state; } } } }

let mut statesnapshotbeforeincrement = 0; let mut statesnapshotafterincrement = 0;

let mut statesnapshotbeforedecrement = 0; let mut statesnapshotafterdecrement = 0;

{ /// Will infer exact type and size of struct [Task] and /// make sized [NonBlockingMutex] which takes only [Task] /// without ever requiring [Box]-ing or dynamic dispatch let nonblockingmutex = NonBlockingMutex::new(maxconcurrentthread_count, 0);

scope(|scope| {
    scope.spawn(|| {
        non_blocking_mutex.run_if_first_or_schedule_on_first(
            Task::new_increment_and_store_snapshots(
                &mut state_snapshot_before_increment,
                &mut state_snapshot_after_increment,
            ),
        );
        non_blocking_mutex.run_if_first_or_schedule_on_first(
            Task::new_decrement_and_store_snapshots(
                &mut state_snapshot_before_decrement,
                &mut state_snapshot_after_decrement,
            ),
        );
    });
});

}

asserteq!(statesnapshotbeforeincrement, 0); asserteq!(statesnapshotafterincrement, 1);

asserteq!(statesnapshotbeforedecrement, 1); asserteq!(statesnapshotafterdecrement, 0); ```

Why you may want to not use NonBlockingMutex

Benchmarks

See benchmark logic in directory benches and reproduce results by running bash cargo bench

Single fast operation in single thread without contention

Dynamic NonBlockingMutex performs only a little bit slower than Mutex when there is only 1 thread and 1 operation (because NonBlockingMutex doesn't Box and store in ShardedQueue first operation in loop), while NonBlockingMutexForSizedTaskWithStaticDispatch outperforms other synchronization options when there is only 1 thread and 1 operation

| benchmarkname | time | |:------------------------------------------------|----------:| | incrementoncewithoutmutex | 0.228 ns | | incrementonceundernonblockingmutexstatic | 8.544 ns | | incrementonceundernonblockingmutexdynamic | 9.445 ns | | incrementonceundermutexblockingly | 8.851 ns | | incrementonceundermutexspinny | 10.603 ns |

Emulating expensive operation by spinning N times under lock with many threads and highest contention

With higher contention(caused by long time under lock in our case, but can also be caused by higher CPU count), NonBlockingMutex starts to perform better than std::sync::Mutex

| Benchmark name | Operation count per thread | Spin under lock count | Concurrent thread count | averagetime | |:--------------------------------------------------------|---------------------------:|----------------------:|------------------------:|-------------:| | incrementundernonblockingmutexconcurrentlystatic | 1000 | 0 | 24 | 2.313 ms | | incrementundernonblockingmutexconcurrentlydynamic | 1000 | 0 | 24 | 3.408 ms | | incrementundermutexblockinglyconcurrently | 1000 | 0 | 24 | 1.072 ms | | incrementundermutexspinnyconcurrently | 1000 | 0 | 24 | 4.376 ms | | incrementundernonblockingmutexconcurrentlystatic | 10000 | 0 | 24 | 23.969 ms | | incrementundernonblockingmutexconcurrentlydynamic | 10000 | 0 | 24 | 42.584 ms | | incrementundermutexblockinglyconcurrently | 10000 | 0 | 24 | 14.960 ms | | incrementundermutexspinnyconcurrently | 10000 | 0 | 24 | 94.658 ms | | incrementundernonblockingmutexconcurrentlystatic | 1000 | 10 | 24 | 9.457 ms | | incrementundernonblockingmutexconcurrentlydynamic | 1000 | 10 | 24 | 12.280 ms | | incrementundermutexblockinglyconcurrently | 1000 | 10 | 24 | 8.345 ms | | incrementundermutexspinnyconcurrently | 1000 | 10 | 24 | 34.977 ms | | incrementundernonblockingmutexconcurrentlystatic | 10000 | 10 | 24 | 58.297 ms | | incrementundernonblockingmutexconcurrentlydynamic | 10000 | 10 | 24 | 70.013 ms | | incrementundermutexblockinglyconcurrently | 10000 | 10 | 24 | 84.143 ms | | incrementundermutexspinnyconcurrently | 10000 | 10 | 24 | 349.070 ms | | incrementundernonblockingmutexconcurrentlystatic | 1000 | 100 | 24 | 39.569 ms | | incrementundernonblockingmutexconcurrentlydynamic | 1000 | 100 | 24 | 44.670 ms | | incrementundermutexblockinglyconcurrently | 1000 | 100 | 24 | 47.335 ms | | incrementundermutexspinnyconcurrently | 1000 | 100 | 24 | 117.570 ms | | incrementundernonblockingmutexconcurrentlystatic | 10000 | 100 | 24 | 358.480 ms | | incrementundernonblockingmutexconcurrentlydynamic | 10000 | 100 | 24 | 378.230 ms | | incrementundermutexblockinglyconcurrently | 10000 | 100 | 24 | 801.090 ms | | incrementundermutexspinnyconcurrently | 10_000 | 100 | 24 | 1200.400 ms |

Design explanation

First thread, which calls NonBlockingMutex::run_if_first_or_schedule_on_first, atomically increments task_count, and, if thread was first to increment task_count from 0 to 1, first thread immediately executes first task, and then atomically decrements task_count and checks if task_count changed from 1 to 0. If task_count changed from 1 to 0 - there are no more tasks and first thread can finish execution loop, otherwise first thread gets next task from task_queue and runs task, then decrements tasks count after it was run and repeats check if task_count changed from 1 to 0 and running tasks until there are no more tasks left.

Not first threads also atomically increment task_count, do check if they are first, Box task and push task Box to task_queue

This design allows us to avoid lock contention, but adds ~constant time of Box-ing task and putting task Box into concurrent task_queue, and incrementing and decrementing task_count, so when lock contention is low, NonBlockingMutex performs worse than std::sync::Mutex, but when contention is high (because we have more CPU-s or because we want to do expensive calculations under lock), NonBlockingMutex performs better than std::sync::Mutex