Ratsio is a Rust client library for NATS messaging system and NATS Event Streaming.
Inspired by nitox and rust-nats but my project needed NATS streaming, so I couldn't use any of those 2. If this project is useful to you, feel free to contribute or suggest features. at the moment it's just the features I need.
Add the following to your Cargo.toml.
rust
[dependencies]
ratsio = "^0.1"
Rust -stable, -beta and -nightly are supported.
Subscribing and Publishing to a NATS subject: see tests/natsclienttest.rs ```rust let mut runtime = Runtime::new().unwrap(); let connectcmd = Connect::builder().build().unwrap(); let options = NatsClientOptions::builder() .connect(connectcmd) .cluster_uris(vec!(String::from("127.0.0.1:4222"))) .build() .unwrap();
let fut = NatsClient::from_options(options)
.and_then(|client| NatsClient::connect(&client))
.and_then(|client| {
client
.subscribe(Subscribe::builder().subject("foo".into()).build().unwrap())
.map_err(|_| RatsioError::InnerBrokenChain)
.and_then(move |stream| {
let _ = client
.publish(Publish::builder().subject("foo".into()).payload(Vec::from(&b"bar"[..])).build().unwrap())
.wait();
stream
.take(1)
.into_future()
.map(|(maybe_message, _)| maybe_message.unwrap())
.map_err(|_| RatsioError::InnerBrokenChain)
})
});
let (tx, rx) = oneshot::channel();
runtime.spawn(fut.then(|r| tx.send(r).map_err(|e| panic!("Cannot send Result {:?}", e))));
let connection_result = rx.wait().expect("Cannot wait for a result");
let _ = runtime.shutdown_now().wait();
info!(target: "ratsio", "can_sub_and_pub::connection_result {:#?}", connection_result);
assert!(connection_result.is_ok());
let msg = connection_result.unwrap();
assert_eq!(msg.payload, Vec::from(&b"bar"[..]));
```
Subscribing and Publishing to a NATS streaming subject: see tests/stanclienttest.rs ``` rust let mut runtime = Runtime::new().unwrap(); let natsoptions = NatsClientOptions::builder() .connect(Connect::builder().build().unwrap()) .clusteruris(vec!(String::from("127.0.0.1:4222"))) .build() .unwrap(); let stanoptions = StanOptions::builder() .natsoptions(natsoptions) .clusterid("test-cluster".into()) .clientid("main-1".into()).build() .unwrap(); let (resulttx, result_rx) = mpsc::unbounded();
let (stan_client_tx, stan_client_rx) = oneshot::channel();
let subject: String = "test.subject".into();
let subject1 = subject.clone();
let subject2 = subject.clone();
let program = StanClient::from_options(stan_options)
.and_then(move |stan_client| {
let sub = StanSubscribe::builder()
.subject(subject1.clone())
.start_position(StartPosition::NewOnly)
.build().unwrap();
stan_client
.subscribe(sub, SyncHandler(Box::new(move |stan_msg: StanMessage| {
info!(target: "ratsio", " GOT :::: {:?}", stan_msg);
tokio::spawn(result_tx.clone().send(stan_msg).into_future()
.map(|_| ()).map_err(|_| ()));
Ok(())
})))
.and_then(move |_| Ok(stan_client))
})
.and_then(move |stan_client| {
let stan_msg = StanMessage::new(subject2.clone(), Vec::from(&b"hello"[..]));
stan_client.send(stan_msg).map(|_| stan_client)
})
.map_err(|_| ())
.and_then(|stan_client| {
stan_client_tx.send(stan_client)
.into_future()
.map(|_| ())
.map_err(|_| ())
}).map_err(|_| ());
runtime.spawn(program);
let stan_client = stan_client_rx.wait().expect(" No STAN Client");
match result_rx.wait().next().expect("Cannot wait for a result") {
Ok(stan_msg) => {
info!(target: "ratsio", "Got stan_msg {:?}", stan_msg);
assert_eq!(stan_msg.subject, subject);
assert_eq!(stan_msg.payload, Vec::from(&b"hello"[..]));
}
Err(_) => {
assert!(false);
}
};
let (close_tx, close_rx) = oneshot::channel();
runtime.spawn(stan_client.close()
.and_then(|_| close_tx
.send(true).into_future()
.map(|_| ())
.map_err(|_| ()))
);
let _ = close_rx.wait().expect(" Could not close STAN Client");
let _ = runtime.shutdown_now().wait();
```
For bug reports, patches, feature requests or other messages, please send a mail to michael@zulzi.com
This project is licensed under the MIT License.