Swap Queue

License Cargo Documentation CI

A lock-free thread-owned queue whereby tasks are taken by stealers in entirety via buffer swapping. This is meant to be used [thread_local] paired with [tokio::task::spawn] as a highly-performant take-all batching mechanism and is around ~11-19% faster than [crossbeam::deque::Worker], and ~28-45% faster than [tokio::sync::mpsc] on ARM.

Example

```rust use swap_queue::Worker; use tokio::{ runtime::Handle, sync::oneshot::{channel, Sender}, };

// Jemalloc makes this library substantially faster

[global_allocator]

static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;

// Worker needs to be thread local because it is !Sync thread_local! { static QUEUE: Worker<(u64, Sender)> = Worker::new(); }

// This mechanism will batch optimally without overhead within an async-context because take will poll after everything else scheduled async fn push_echo(i: u64) -> u64 { { let (tx, rx) = channel();

QUEUE.with(|queue| {
  // A new stealer is issued for every buffer swap
  if let Some(stealer) = queue.push((i, tx)) {
    Handle::current().spawn(async move {
      // Take the underlying buffer in entirety.
      let batch = stealer.take().await;

      // Some sort of batched operation, such as a database load

      batch.into_iter().for_each(|(i, tx)| {
        tx.send(i).ok();
      });
    });
  }
});

rx

} .await .unwrap() } ```

Benchmarks

Benchmarks ran on t4g.medium running Amazon Linux 2 AMI (HVM)

Benchmarks, 64 tasks Benchmarks, 128 tasks Benchmarks, 256 tasks Benchmarks, 512 tasks Benchmarks, 1024 tasks

CI tested under ThreadSanitizer, LeakSanitizer, Miri and Loom.