Time Key Stream Set

This crate provides an hybrid in/out-of-memory data structure for deduplication of mostly ordered stream of time keys.

Goals

There are several priority features that motivate the design and development of this crate.

Motivational Heuristics

There are several veteran structures in this space like B+ Trees or Log Structured Merge Trees. As a specialized use case, we need not suffer some of the trade offs involved in the generalization these structures. Several features of the incoming data stream will be utilized to optimize the average case.

  1. Inserts are uploaded in batches from Wearable/Mobile devices
    1. Batch sizes of ~5000 consecutive time keys are the standard case
  2. For each userid/deviceid/source pair
    1. The timestamp in microseconds will be separated by 100s to 1000000s.
    2. Incoming data will rarely go backwards, almost only when reading out saved data from the beginning and again progressing forward.
  3. Row data associated with the time keys are not stored with the time key stream set.
  4. Truncation and reclamation of the stream set resources may happen at relatively infrequent intervals, for example, once hourly.

A B+ Tree is reasonably well suited for such a scenario, except for several key distinctions. The number one reason being the lack of a viable crate implementation for OOM storage, and number two being the lack of optimization for the contiguous block insertion for u128 time keys. A similar lack of support for LSM trees motivates the development of the following algorithm's implementation.

Approach

Segments are compressed via tsz then zstd w/ compression for some sparse segments with consistent data rates over 1000x to 200000x compression rates for some data sets when flushed to disk. Segments are accessed serially with insertion rates dependent on the backing index, BTreeMap right now yeilds insertion rates on the order of ~300000 _time keys per second (single core CPU bound).

On each insert, here is the pseudo code for insertion

  1. Compute the segment start bucket timestamp for each row
  2. Lock the segment index
  3. Find the existing segments in range for bucket timestamps under insertion
  4. Create any new segments not yet in the index
  5. Lock the hot segment cache
  6. Insert new segments into the index and the cache
  7. Loop on LRU eviction and hydration of segments not/under insertion, respecting requested memory limit
  8. Release the hot segment cache lock
  9. Lock each segment under insertion
  10. Release the segment index lock
  11. Insert time keys into their respective segments
  12. Release each segment locks
  13. Update memory usage for newly inserted time keys

Interface

In Rust, using the Time Key Stream Set, TkStreamSet, looks like the following:

```rust use timekeystream_set::prelude::*; use std::error::Error;

async fn frontend_receive() -> Result, Box> { // Who knows, maybe you get this data on an HTTP server unimplemented!() }

async fn yolo() { // Hope the database doesn't rollback this transaction unimplemented!() }

[derive(IntoTimeKey)]

struct AdcRow { tsus: i64, userid: i32, device_id: i32, millivolts: i16, }

[tokio::main]

async fn main() -> Result<(), Box> { // Rehydrate a set of time keys from a known directory or a temperatory directory // Configure a stream set via a builder with reasonable defaults for the data source let streamset = TkStreamSetBuilder::new() .withsegmenttimeinterval(Duration::fromsecs(60 * 60 * 2)) .withmemory_limit(MemoryLimit::Low) .build() .await .unwrap();

// Multiple threads could be running the same loop on this node
loop {
    // I got some new data that may have duplicates
    let batch: Vec<AdcRow> = frontend_receive().await?;

    // We are going to keep this data, glad RAM doesn't blow up
    let dups = stream_set.insert(batch.iter().map(|row| row.into()).collect::<Vec<_>>).await?;
    let dedups = batch.iter()
        .zip(dups)
        .filter_map(|(row, keep)| if keep { Some(row) } else { None });

    // Send to the data warehouse somewhere
    loop {
        match yolo().await {
            Ok(_) => break,
            Err(e) => todo!(),
        }
    }
}

}

```