ShardedQueue

github crates.io docs.rs Build and test Rust

Why you should use ShardedQueue

ShardedQueue is currently the fastest collection which can be used under highest concurrency and load among most popular solutions, like concurrent-queue - see benchmarks in directory benches and run them with bash cargo bench

Installation

bash cargo add sharded_queue

Example

```rust use std::thread::{availableparallelism}; use shardedqueue::ShardedQueue;

/// How many threads can physically access [ShardedQueue] /// simultaneously, needed for computing shard_count let maxconcurrentthreadcount = availableparallelism().unwrap().get();

let shardedqueue = ShardedQueue::new(maxconcurrentthreadcount);

shardedqueue.pushback(1); let item = shardedqueue.popfrontorspinwaititem(); ```

Why you may want to not use ShardedQueue

Benchmarks

ShardedQueue outperforms other concurrent queues. See benchmark logic in directory benches and reproduce results by running bash cargo bench | Benchmark name | Operation count per thread | Concurrent thread count | averagetime | |:-------------------------------------------|---------------------------:|------------------------:|-------------:| | shardedqueuepushandpopconcurrently | 1000 | 24 | 1.1344 ms | | concurrentqueuepushandpopconcurrently | 1000 | 24 | 4.8130 ms | | crossbeamqueuepushandpopconcurrently | 1000 | 24 | 5.3154 ms | | queuemutexpushandpopconcurrently | 1000 | 24 | 6.4846 ms | | shardedqueuepushandpopconcurrently | 10000 | 24 | 8.1651 ms | | concurrentqueuepushandpopconcurrently | 10000 | 24 | 44.660 ms | | crossbeamqueuepushandpopconcurrently | 10000 | 24 | 49.234 ms | | queuemutexpushandpopconcurrently | 10000 | 24 | 69.207 ms | | shardedqueuepushandpopconcurrently | 100000 | 24 | 77.167 ms | | concurrentqueuepushandpopconcurrently | 100000 | 24 | 445.88 ms | | crossbeamqueuepushandpopconcurrently | 100000 | 24 | 434.00 ms | | queuemutexpushandpopconcurrently | 100_000 | 24 | 476.59 ms |

Design explanation

ShardedQueue is designed to be used in some schedulers and NonBlockingMutex as the most efficient collection under highest concurrently and load (concurrent stack can't outperform it, because, unlike queue, which spreads pop and push contention between front and back, stack pop-s from back and push-es to back, which has double the contention over queue, while number of atomic increments per pop or push is same as in queue)

ShardedQueue uses array of protected by separate Mutex-es queues(shards), and atomically increments head_index or tail_index when pop or push happens, and computes shard index for current operation by applying modulo operation to head_index or tail_index

Modulo operation is optimized, knowing that x % 2^n == x & (2^n - 1) , so, as long as count of queues(shards) is a power of two, we can compute modulo very efficiently using formula operation_number % shard_count == operation_number & (shard_count - 1)

As long as count of queues(shards) is a power of two and is greater than or equal to number of CPU-s, and CPU-s spend ~same time in push/pop (time is ~same, since it is amortized O(1)), multiple CPU-s physically can't access same shards simultaneously and we have best possible performance. Synchronizing underlying non-concurrent queue costs only - 1 additional atomic increment per push or pop (incrementing head_index or tail_index) - 1 additional compare_and_swap and 1 atomic store (uncontended Mutex acquire and release) - 1 cheap bit operation(to get modulo) - 1 get from queue(shard) list by index

Complex example

```rust use sharded_queue::ShardedQueue; use std::cell::UnsafeCell; use std::fmt::{Debug, Display, Formatter}; use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; use std::sync::atomic::{AtomicUsize, Ordering};

pub struct NonBlockingMutex<'capturedvariables, State: ?Sized> { taskcount: AtomicUsize, taskqueue: ShardedQueue) + Send + 'capturedvariables>>, unsafe_state: UnsafeCell, }

/// [NonBlockingMutex] is needed to run actions atomically without thread blocking, or context /// switch, or spin lock contention, or rescheduling on some scheduler /// /// Notice that it uses [ShardedQueue] which doesn't guarantee order of retrieval, hence /// [NonBlockingMutex] doesn't guarantee order of execution too, even of already added /// items impl<'capturedvariables, State> NonBlockingMutex<'capturedvariables, State> { pub fn new(maxconcurrentthreadcount: usize, state: State) -> Self { Self { taskcount: AtomicUsize::new(0), taskqueue: ShardedQueue::new(maxconcurrentthreadcount), unsafe_state: UnsafeCell::new(state), } }

/// Please don't forget that order of execution is not guaranteed. Atomicity of operations is guaranteed,
/// but order can be random
pub fn run_if_first_or_schedule_on_first(
    &self,
    run_with_state: impl FnOnce(MutexGuard<State>) + Send + 'captured_variables,
) {
    if self.task_count.fetch_add(1, Ordering::Acquire) != 0 {
        self.task_queue.push_back(Box::new(run_with_state));
    } else {
        // If we acquired first lock, run should be executed immediately and run loop started
        run_with_state(unsafe { MutexGuard::new(self) });
        /// Note that if [`fetch_sub`] != 1
        /// => some thread entered first if block in method
        /// => [ShardedQueue::push_back] is guaranteed to be called
        /// => [ShardedQueue::pop_front_or_spin_wait_item] will not deadlock while spins until it gets item
        ///
        /// Notice that we run action first, and only then decrement count
        /// with releasing(pushing) memory changes, even if it looks otherwise
        while self.task_count.fetch_sub(1, Ordering::Release) != 1 {
            self.task_queue.pop_front_or_spin_wait_item()(unsafe { MutexGuard::new(self) });
        }
    }
}

}

/// [Send], [Sync], and [MutexGuard] logic was taken from [std::sync::Mutex] /// and [std::sync::MutexGuard] /// /// these are the only places where T: Send matters; all other /// functionality works fine on a single thread. unsafe impl<'capturedvariables, State: ?Sized + Send> Send for NonBlockingMutex<'capturedvariables, State> { } unsafe impl<'capturedvariables, State: ?Sized + Send> Sync for NonBlockingMutex<'capturedvariables, State> { }

/// Code was mostly taken from [std::sync::MutexGuard], it is expected to protect [State] /// from moving out of synchronized loop pub struct MutexGuard< 'capturedvariables, 'nonblockingmutexref, State: ?Sized + 'nonblockingmutex_ref,

{ nonblockingmutex: &'nonblockingmutexref NonBlockingMutex<'capturedvariables, State>, /// Adding it to ensure that [MutexGuard] implements [Send] and [Sync] in same cases /// as [std::sync::MutexGuard] and protects [State] from going out of synchronized /// execution loop /// /// todo remove when this error is no longer actual /// negative trait bounds are not yet fully implemented; use marker types for now [E0658] phantomunsend: PhantomData>, }

// todo uncomment when this error is no longer actual // negative trait bounds are not yet fully implemented; use marker types for now [E0658] // impl<'capturedvariables, 'nonblockingmutexref, State: ?Sized> !Send // for MutexGuard<'capturedvariables, 'nonblockingmutexref, State> // { // } unsafe impl<'capturedvariables, 'nonblockingmutexref, State: ?Sized + Sync> Sync for MutexGuard<'capturedvariables, 'nonblockingmutexref, State> { }

impl<'capturedvariables, 'nonblockingmutexref, State: ?Sized> MutexGuard<'capturedvariables, 'nonblockingmutexref, State> { unsafe fn new( nonblockingmutex: &'nonblockingmutexref NonBlockingMutex<'capturedvariables, State>, ) -> Self { Self { nonblockingmutex, phantomunsend: PhantomData, } } }

impl<'capturedvariables, 'nonblockingmutexref, State: ?Sized> Deref for MutexGuard<'capturedvariables, 'nonblockingmutexref, State> { type Target = State;

fn deref(&self) -> &State {
    unsafe { &*self.non_blocking_mutex.unsafe_state.get() }
}

}

impl<'capturedvariables, 'nonblockingmutexref, State: ?Sized> DerefMut for MutexGuard<'capturedvariables, 'nonblockingmutexref, State> { fn derefmut(&mut self) -> &mut State { unsafe { &mut *self.nonblockingmutex.unsafestate.get() } } }

impl<'capturedvariables, 'nonblockingmutexref, State: ?Sized + Debug> Debug for MutexGuard<'capturedvariables, 'nonblockingmutexref, State> { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { Debug::fmt(&**self, f) } }

impl<'capturedvariables, 'nonblockingmutexref, State: ?Sized + Display> Display for MutexGuard<'capturedvariables, 'nonblockingmutexref, State> { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { (**self).fmt(f) } } ```