Ratsio

Ratsio is a Rust client library for NATS messaging system and NATS Event Streaming.

Inspired by nitox and rust-nats but my project needed NATS streaming, so I couldn't use any of those 2. If this project is useful to you, feel free to contribute or suggest features. at the moment it's just the features I need.

Add the following to your Cargo.toml.

rust [dependencies] ratsio = "^0.2" Rust -stable, -beta and -nightly are supported.

Features:

Subscribing and Publishing to a NATS subject: see tests/natsclienttest.rs ```rust let mut runtime = Runtime::new().unwrap(); let options = NatsClientOptions::builder() .clusteruris(vec!("127.0.0.1:4222".tostring())) .build() .unwrap();

let fut = NatsClient::from_options(options)
    .and_then(|client| NatsClient::connect(&client))
    .and_then(|client| {
        client
            .subscribe(Subscribe::builder().subject("foo".into()).build().unwrap())
            .map_err(|_| RatsioError::InnerBrokenChain)
            .and_then(move |stream| {
                let _ = client
                    .publish(Publish::builder().subject("foo".into()).payload(Vec::from(&b"bar"[..])).build().unwrap())
                    .wait();

                stream
                    .take(1)
                    .into_future()
                    .map(|(maybe_message, _)| maybe_message.unwrap())
                    .map_err(|_| RatsioError::InnerBrokenChain)
            })
    });

let (tx, rx) = oneshot::channel();
runtime.spawn(fut.then(|r| tx.send(r).map_err(|e| panic!("Cannot send Result {:?}", e))));
let connection_result = rx.wait().expect("Cannot wait for a result");
let _ = runtime.shutdown_now().wait();
info!(target: "ratsio", "can_sub_and_pub::connection_result {:#?}", connection_result);
assert!(connection_result.is_ok());
let msg = connection_result.unwrap();
assert_eq!(msg.payload, Vec::from(&b"bar"[..]));

```

Subscribing and Publishing to a NATS streaming subject: see tests/stanclienttest.rs ``` rust let mut runtime = Runtime::new().unwrap(); let natsoptions = NatsClientOptions::builder() .clusteruris(vec!("127.0.0.1:4222".tostring())) .build() .unwrap(); let stanoptions = StanOptions::builder() .natsoptions(natsoptions) .clusterid("test-cluster") .clientid("main-1").build() .unwrap(); let (resulttx, resultrx) = mpsc::unbounded();

let (stan_client_tx, stan_client_rx) = oneshot::channel();
let subject: String = "test.subject".into();
let subject1 = subject.clone();
let subject2 = subject.clone();
let program = StanClient::from_options(stan_options)
    .and_then(move |stan_client| {
        let sub = StanSubscribe::builder()
            .subject(subject1.clone())
            .start_position(StartPosition::NewOnly)
            .build().unwrap();
        stan_client
            .subscribe(sub, SyncHandler(Box::new(move |stan_msg: StanMessage| {
                info!(target: "ratsio", " GOT :::: {:?}", stan_msg);
                tokio::spawn(result_tx.clone().send(stan_msg).into_future()
                    .map(|_| ()).map_err(|_| ()));
                Ok(())
            })))
            .and_then(move |_| Ok(stan_client))
    })
    .and_then(move |stan_client| {
        let stan_msg = StanMessage::new(subject2.clone(), Vec::from(&b"hello"[..]));
        stan_client.send(stan_msg).map(|_| stan_client)
    })
    .map_err(|_| ())
    .and_then(|stan_client| {
        stan_client_tx.send(stan_client)
            .into_future()
            .map(|_| ())
            .map_err(|_| ())
    }).map_err(|_| ());

runtime.spawn(program);

let stan_client = stan_client_rx.wait().expect(" No STAN Client");

match result_rx.wait().next().expect("Cannot wait for a result") {
    Ok(stan_msg) => {
        info!(target: "ratsio", "Got stan_msg {:?}", stan_msg);
        assert_eq!(stan_msg.subject, subject);
        assert_eq!(stan_msg.payload, Vec::from(&b"hello"[..]));
    }
    Err(_) => {
        assert!(false);
    }
};

let (close_tx, close_rx) = oneshot::channel();
runtime.spawn(stan_client.close()
    .and_then(|_| close_tx
        .send(true).into_future()
        .map(|_| ())
        .map_err(|_| ()))
);

let _ = close_rx.wait().expect(" Could not close STAN Client");
let _ = runtime.shutdown_now().wait();

```

Important Changes

Version 0.2

Users no longer need to use ratsio::ops::Connect struct when configuring a connection. Options are now availabble on NatsClientOptions, username, password, tlsrequired, authtoken, etc

For example rust let nats_options = NatsClientOptions::builder() .username("user".to_string()) .password("password".to_string()) .cluster_uris(vec!("127.0.0.1:4222".to_string())) .build() .unwrap();

Internal nuid fork from nuid upgraded to use rand version ^0.6

Contact

For bug reports, patches, feature requests or other messages, please send a mail to michael@zulzi.com

License

This project is licensed under the MIT License.