Build Status Crates.io

nsqueue

A Tokio based client implementation for the NSQ realtime message processing system

WORK IN PROGRESS

Current features

Launch NSQ

$ nsqlookupd & $ nsqd --lookupd-tcp-address=127.0.0.1:4160 & $ nsqadmin --lookupd-http-address=127.0.0.1:4161 &

PUB

``` extern crate futures; extern crate tokio_core; extern crate nsqueue;

use futures::Future; use tokio_core::reactor::Core;

use nsqueue::config::; use nsqueue::producer::;

fn main() { let mut core = Core::new().unwrap(); let handle = core.handle();

 let addr = "127.0.0.1:4150".parse().unwrap();

 let res = Producer::connect(&addr, &handle, Config::default())
    .and_then(|conn| {
        conn.publish("some_topic".into(), "some_message".into())
        .and_then(move |response| {
           println!("Response: {:?}", response);
           Ok(())
        })
    });
 core.run(res).unwrap();

} ```

SUB

``` extern crate futures; extern crate tokio_core; extern crate nsqueue;

use futures::{Stream, Future}; use tokio_core::reactor::Core;

use nsqueue::config::; use nsqueue::consumer::; use nsqueue::response::NSQ;

fn main() { let mut core = Core::new().unwrap(); let handle = core.handle();

 let addr = "127.0.0.1:4150".parse().unwrap();

 core.run(
     Consumer::connect(&addr, &handle, Config::default())
     .and_then(|conn| {
        conn.subscribe("some_topic".into(), "some_channel".into())
        .and_then(move |message| {
            match message {
                NSQ::Stream(response) => {
                    let message_id = response.header.clone();
                    let ret = response.for_each(move |message| {
                        println!("Response: {:?}", message);
                        conn.fin(message_id.clone()) // Inform NSQ (Message consumed) 
                    });
                    ret
                }
            }
        })
     })
 ).unwrap();

} ```

License

Licensed under either of