redis-event

Build Status codecov Crates.io Crates.io

用于监听Redis的写入操作,据此可以实现数据复制,监控等相关的应用。

[dependencies] redis-event = "1.0.3"

原理

此crate实现了Redis Replication协议,在运行时,程序将以replica的身份连接到Redis,相当于Redis的一个副本。

所以,在程序连接上某个Redis之后,Redis会将它当前的所有数据以RDB的格式dump一份,dump完毕之后便发送过来,这个RDB中的每一条数据就对应一个Event::RDB事件。

在这之后,Redis接收到来自客户端的写入操作(即Redis命令)后,也会将这个写入操作传播给它的replica,每一个写入操作就对应一个Event::AOF事件。

示例

```rust use std::net::{IpAddr, SocketAddr}; use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::str::FromStr; use std::rc::Rc; use std::cell::RefCell; use std::io; use redisevent::listener; use redisevent::config::Config; use redis_event::{NoOpEventHandler, RedisListener};

fn main() -> io::Result<()> { let ip = IpAddr::from_str("127.0.0.1").unwrap(); let port = 6379;

let conf = Config {
    is_discard_rdb: false,            // 不跳过RDB
    is_aof: false,                    // 不处理AOF
    addr: SocketAddr::new(ip, port),
    password: String::new(),          // 密码为空
    repl_id: String::from("?"),       // replication id,若无此id,设置为?即可
    repl_offset: -1,                  // replication offset,若无此offset,设置为-1即可
    read_timeout: None,               // None,即读取永不超时
    write_timeout: None,              // None,即写入永不超时
};
let running = Arc::new(AtomicBool::new(true));
let mut redis_listener = listener::new(conf, running);
// 设置事件处理器
redis_listener.set_event_handler(Rc::new(RefCell::new(NoOpEventHandler{})));
// 启动程序
redis_listener.start()?

} ```

Module

Module解析示例

```rust use std::any::Any; use redisevent::rdb::Module; use redisevent::ModuleParser;

[derive(Debug)]

struct HelloModule { pub values: Vec }

impl Module for HelloModule { fn as_any(&self) -> &dyn Any { self } }

struct HelloModuleParser {}

impl ModuleParser for HelloModuleParser { fn parse(&mut self, input: &mut dyn Read, modulename: &str, moduleversion: usize) -> Box { let elements = self.loadunsigned(input, moduleversion); let elements = elements.tou32().unwrap();

    let mut array = Vec::new();
    for _ in 0..elements {
        let val = self.load_signed(input, module_version);
        array.push(val);
    }

    Box::new(HelloModule { values: array })
}

}

fn main() { // 省略其他代码... let parser = Rc::new(RefCell::new(HelloModuleParser {})); let mut redislistener = redisevent::listener::new(conf, running); redislistener.setmodule_parser(parser); } ```

处理Module

rust impl EventHandler for EventHandlerImpl { fn handle(&mut self, event: Event) { match event { Event::RDB(rdb) => { match rdb { Object::Module(_, module) => { let hello_module: &HelloModule = match module.as_any().downcast_ref::<HelloModule>() { Some(hello_module) => hello_module, None => panic!("not HelloModule") }; // ... } _ => {} } } _ => {} } } }