Build Status Crates.io

nsqueue

A Tokio based client implementation for the NSQ realtime message processing system

WORK IN PROGRESS

Current features

Launch NSQ

$ nsqlookupd & $ nsqd --lookupd-tcp-address=127.0.0.1:4160 & $ nsqadmin --lookupd-http-address=127.0.0.1:4161 &

MPUB

``` extern crate futures; extern crate tokio_core; extern crate nsqueue;

use futures::Future; use tokio_core::reactor::Core;

use nsqueue::config::; use nsqueue::producer::;

fn main() { let mut core = Core::new().unwrap(); let handle = core.handle();

let addr = "127.0.0.1:4150".parse().unwrap();

let mut messages: Vec<String> = Vec::new();
messages.push("First message".into());
messages.push("Second message".into());

let res = Producer::connect(&addr, &handle, Config::default())
   .and_then(|conn| {
       conn.mpublish("some_topic".into(), messages)
       .and_then(move |response| {
          println!("Response: {:?}", response);
          Ok(())
       })
   });
core.run(res).unwrap();

} ```

SUB

``` extern crate futures; extern crate tokio_core; extern crate nsqueue;

use futures::{Stream, Future}; use tokio_core::reactor::Core;

use nsqueue::config::; use nsqueue::consumer::; use nsqueue::response::NSQ;

fn main() { let mut core = Core::new().unwrap(); let handle = core.handle();

 let addr = "127.0.0.1:4150".parse().unwrap();

 core.run(
     Consumer::connect(&addr, &handle, Config::default())
     .and_then(|conn| {
        conn.subscribe("some_topic".into(), "some_channel".into())
        .and_then(move |message| {
            match message {
                NSQ::Stream(response) => {
                    let ret = response.for_each(move |message| {
                        if message.message_id == "_heartbeat_" {
                            conn.nop();
                        } else {
                            println!("Response {:?} {:?}", message.message_id, message.message_body);
                            conn.fin(message.message_id); // Inform NSQ (Message consumed)
                        }
                        Ok(())
                    });
                    ret
                }
            }
        })
     })
 ).unwrap();

} ```

License

Licensed under either of