lossyq for Rust

lossyq is a single-publisher, single-publisher queue with certain characteristics:

Example

```rust extern crate lossyq;

fn main() { use std::thread;

// create a very small channel of 2 elements let (mut tx, mut rx) = lossyq::spsc::channel(2, 0 as i32); let t = thread::spawn(move|| { for i in 1..4 { // the tx.put() receives a lambda function that in turn // gets a writable reference to the next element in the queue tx.put(|v| *v = i); } }); t.join().unwrap(); // the receiver receives an iterator which may be // further passed to other adapters let sum = rx.iter().fold(0, |acc, num| acc + num);

// this should print 5 as the writer sent three items to the // queue: [1,2,3] and the first item got overwritten by the // last one println!("sum={}",sum); } ```

Putting an element

As in the example above the put function receives a closure that in turn receives a mutable reference to an element in the queue. This way we never need to allocate memory on insertion.

Reading elements

When reading, the iter function receives an iterator that has a reference to all readable elements at the moment. If the writer writes more elements to the queue, the iterator will still be valid, only that it won't see the newly written elements. To see them, a new iterator needs to be created by a new iter call.

Rationale

Let me emphasize the fact that the reader may lose updates. I believe this is not a problem, only a certain property to live with. Other queue implementations choose to, either make the queue larger when it becomes full, or block the writer until the reader processed some from the queue. I think all of these are valid choices and they have consequences. When we allocate more memory for the queue, we might obviously run out of it, then we go swapping and the whole system is cursed. The other choice is when we block the writer, the writer performance is limited by the reader.

I think there are scenarios where we are more interested in the latest data than dealing with the out of memory or blocking conditions. An example is heartbeats. We might not care about losing old heartbeats as long as we know the given component was alive few seconds ago.

The fact that this queue doesn't allocate memory, makes its performance predictable and probably fast. (I made no measurements whatsoever, on purpose.)

When you want to minimize the chance of losing items, you would need to choose a larger queue size. The right size depends on your application.

Implementation notes

The heart of this queue is the CircularBuffer data structure. It uses atomic integer operations to make sure the writer and the reader can operate concurrently.

```rust struct CircularBuffer { seqno : AtomicUsize, // the ID of the last written item data : Vec, // (2*n)+1 preallocated elements size : usize, // n

buffer : Vec, // (positions+seqno)[] readpriv : Vec, // positions belong to the reader writetmp : usize, // temporary position where the writer writes first max_read : usize, // reader's last read seqno } ```

When writing

The data vector holds 2n+1 preallocated items. n items belong to the reader and n+1 items belong to the writer. The ownership of who owns which elements are tracked by the buffer, read_priv and write_tmp members. The buffer vector represents the CircularBuffer where each element is composed of 16 bits of the seqno and the rest is a position to the data vector. The write_tmp element is also a position referring to the data vector. When the writer writes a new element:

This design allows the writer to always write to a private area that is not touched by the reader and then it atomically swaps the buffer[new_pos] element over to the freshly written element. This allows writing without interfering with the reader.

When reading

To read data one needs to obtain an iterator through the iter() function. This loops through the buffer in reverse order and atomically swaps the reader's own positions held by the read_priv vector with the position part of the buffer component. While looping it checks that the sequence number part of the buffer entry is the expected one. If not then it knows that the writer has flipped over, so the given element should be returned during the next iteration and it stops.

The result of this operation is that read_priv vector holds the pointers to the previously written elements and the reader gave its own elements to the writer in exchange, so the writer can write those, while the reader works with its own copies.

License

Licensed under either MIT or Apache-2 of your choice.