swap-buffer-queue

A buffering MPSC queue.

This library is intended to be a (better, I hope) alternative to traditional MPSC queues in the context of a buffering consumer, by moving the buffering part directly into the queue.

It is especially well suited for IO writing workflow, see buffer implementations.

The crate is no_std (some buffer implementations may require std).

Example

```rust use std::ops::Deref; use swapbufferqueue::{buffer::VecBuffer, Queue};

// Initialize the queue with a capacity let queue: Queue, usize> = Queue::withcapacity(42); // Enqueue some values queue.tryenqueue(0).unwrap(); queue.tryenqueue(1).unwrap(); // Dequeue a slice to the enqueued values let slice = queue.trydequeue().unwrap(); asserteq!(slice.deref(), &[0, 1]); // Enqueued values can also be retrieved asserteq!(slice.into_iter().collect::>(), vec![0, 1]); ```

Buffer implementations

In addition to simple ArrayBuffer and VecBuffer, this crate provides useful write-oriented implementations.

write

WriteArrayBuffer and WriteVecBuffer are well suited when there are objects to be serialized with a known-serialization size. Indeed, objects can then be serialized directly on the queue's buffer, avoiding allocation.

```rust use swapbufferqueue::Queue; use swapbufferqueue::write::{WriteBytesSlice, WriteVecBuffer};

// the slice to be written in the queue's buffer (not too complex for the example)

[derive(Debug)]

struct Slice(Vec); impl WriteBytesSlice for Slice { fn size(&self) -> usize { self.0.len() } fn write(&mut self, slice: &mut [u8]) { slice.copyfromslice(&self.0); } } //! // Creates a WriteVecBuffer queue with a 2-bytes header let queue: Queue, Slice> = Queue::withcapacity((1 << 16) - 1); queue.tryenqueue(Slice(vec![0; 256])).unwrap(); queue.tryenqueue(Slice(vec![42; 42])).unwrap(); let mut slice = queue.trydequeue().unwrap(); // Adds a header with the len of the buffer let len = (slice.len() as u16).tobebytes(); slice.header().copyfromslice(&len); // Let's pretend we have a writer let mut writer: Vec = Default::default(); assert_eq!( std::io::Write::write(&mut writer, slice.frame()).unwrap(), 300 ); ```

write_vectored

WriteVectoredArrayBuffer and WriteVectoredVecBuffer allows buffering a slice of IoSlice, saving the cost of dequeuing io-slices one by one to collect them after. (Internally, two buffers are used, one of the values, and one for the io-slices)

As a convenience, total size of the buffered io-slices can be retrieved.

```rust use std::io::{IoSlice, Write}; use swapbufferqueue::{writevectored::WriteVectoredVecBuffer}; use swapbuffer_queue::Queue;

// Creates a WriteVectoredVecBuffer queue let queue: Queue>, Vec> = Queue::withcapacity(100); queue.tryenqueue(vec![0; 256]).unwrap(); queue.tryenqueue(vec![42; 42]).unwrap(); let mut slice = queue.trydequeue().unwrap(); // Adds a header with the total size of the slices let totalsize = (slice.totalsize() as u16).tobebytes(); let mut frame = slice.frame(.., Some(IoSlice::new(&totalsize)), None); // Let's pretend we have a writer let mut writer: Vec = Default::default(); asserteq!(writer.write_vectored(&mut frame).unwrap(), 300); ```

How it works

Internally, this queue use 2 buffers: one being used for enqueuing while the other is dequeued.

When Queue::try_enqueue is called, it reserves atomically a slot in the current enqueuing buffer. The value is then inserted in the slot.

When Queue::try_dequeue is called, both buffers are swapped atomically, so dequeued buffer will contain previously enqueued values, and new enqueued ones will go to the other (empty) buffer.

As the two-phase enqueuing cannot be atomic, the queue can be in a transitory state, where slots have been reserved but have not been written yet. This issue is mitigated using a spin loop in dequeuing method. If the spin loop fails, the transitory state is saved and spin loop will be retried at the next dequeue.

Performance

swap-buffer-queue is very performant – it's actually the fastest MPSC queue I know.

Here is the crossbeam benchmark forked

| benchmark | crossbeam | swap-buffer-queue | |---------------|-----------|-------------------| | bounded1mpsc | 1.545s | 1.341s | | bounded1spsc | 1.652s | 1.138s | | boundedmpsc | 0.362s | 0.178s | | boundedseq | 0.190s | 0.156s | | bounded_spsc | 0.115s | 0.139s |

However, a large enough capacity is required to reach maximum performance; otherwise, high contention scenario may be penalized. This is because the algorithm put all the contention on a single atomic integer (instead of two for crossbeam).