Redis Stream

Build Status crates.io

A Rust high-level library to consume data from Redis streams.

This project is a slightly modified port of the Elixir Redix.Stream library to Rust and comes as an extension of redis-rs.

We use it at Klaxit to process the combined log streams from Heroku's Logplex and automatically fix some performance issues that sometimes happen on Heroku even before our users notice them.

We also use it to scale some services when needed.

It's been running in production with great success for more than 6 months.

Installation

The crate is called redis-stream and you can depend on it via cargo:

ini [dependencies] redis-stream = "0.1.2"

Documentation

Documentation on the library can be found at docs.rs/redis-stream.

Basic usage:

```rust use redis_stream::consumer::{Consumer, ConsumerOpts, Message};

let redisurl = std::env::var("REDISURL").unwraporelse(|| "redis://127.0.0.1:6379".tostring());

let mut redis = redis::Client::open(redisurl) .expect("client") .getconnection() .expect("connection");

// Message handler let handler = |_id: &str, message: &Message| { // do something Ok(()) };

// Consumer config let opts = ConsumerOpts::default(); let mut consumer = Consumer::init(&mut redis, "my-stream", handler, opts).expect("consumer");

// Consume some messages through handler. consumer.consume().expect("consume messages");

// Clean up redis use redis::Commands; redis.del::<&str, bool>("my-stream").expect("del"); ```

Consumer groups usage:

```rust use redis_stream::consumer::{Consumer, ConsumerOpts, Message};

let redisurl = std::env::var("REDISURL").unwraporelse(|| "redis://127.0.0.1:6379".tostring());

let mut redis = redis::Client::open(redisurl) .expect("client") .getconnection() .expect("connection");

// Message handler let handler = |_id: &str, message: &Message| { // do something Ok(()) };

// Consumer config let opts = ConsumerOpts::default().group("my-group", "worker.1"); let mut consumer = Consumer::init(&mut redis, "my-stream-2", handler, opts).unwrap();

// Consume some messages through handler. consumer.consume().expect("consume messages");

// Clean up redis use redis::Commands; redis.xgroup_destroy::<&str, &str, bool>("my-stream-2", "my-group").expect("xgroup destroy"); redis.del::<&str, bool>("my-stream-2").expect("del"); ```

Development

If you want to develop on the library, there are a few commands provided by the makefile.

Run make help to get more info.

For testing, a docker-compose.yml file is also available if you need to start a local redis instance:

sh $ docker-compose up -d $ make test

License

Please see LICENSE