Ratsio

Ratsio is a Rust client library for NATS messaging system and NATS Event Streaming.

Inspired by nitox and rust-nats but my project needed NATS streaming, so I couldn't use any of those 2. If this project is useful to you, feel free to contribute or suggest features. at the moment it's just the features I need.

Add the following to your Cargo.toml.

rust [dependencies] ratsio = "0.3.0-alpha.6" Rust -stable, -beta and -nightly are supported.

Features:

Subscribing and Publishing to a NATS subject: see examples/nats_subscribe.rs ```rust use ratsio::{NatsClient, RatsioError}; use log::info; use futures::StreamExt;

pub fn loggersetup() { use log::LevelFilter; use std::io::Write; use envlogger::Builder;

let _ = Builder::new()
    .format(|buf, record| {
        writeln!(buf,
                 "[{}] - {}",
                 record.level(),
                 record.args()
        )
    })
    .filter(None, LevelFilter::Trace)
    .try_init();

}

[tokio::main]

async fn main() -> Result<(), RatsioError> { logger_setup();

//Create nats client
let nats_client = NatsClient::new("nats://localhost:4222").await?;

//subscribe to nats subject 'foo'
let (sid, mut subscription) = nats_client.subscribe("foo").await?;
tokio::spawn(async move {
    //Listen for messages on the 'foo' description 
    //The loop terminates when the upon un_subscribe
    while let Some(message) = subscription.next().await {
        info!(" << 1 >> got message --- {:?}\n\t{:?}", &message,
              String::from_utf8_lossy(message.payload.as_ref()));
    }
    info!(" << 1 >> unsubscribed. loop is terminated.")
});

//subscribe to nats subject 'foo', another subscription 
let (_sid, mut subscription2) = nats_client.subscribe("foo").await?;
tokio::spawn(async move {
    //Listen for messages on the 'foo' description
    while let Some(message) = subscription2.next().await {
        info!(" << 2 >> got message --- {:?}\n\t{:?}", &message,
              String::from_utf8_lossy(message.payload.as_ref()));
    }
});

//Publish some messages, restart nats server during this time.
use std::{thread, time};
thread::sleep(time::Duration::from_secs(5));


//Publish message
let _ = nats_client.publish("foo", b"Publish Message 1").await?;
thread::sleep(time::Duration::from_secs(1));

//Unsubscribe
let _ = nats_client.un_subscribe(&sid).await?;
thread::sleep(time::Duration::from_secs(3));

//Publish some messages.
thread::sleep(time::Duration::from_secs(1));
let _ = nats_client.publish("foo", b"Publish Message 2").await?;
thread::sleep(time::Duration::from_secs(600));
info!(" ---- done --- ");
Ok(())

} ```

Subscribing and Publishing to a NATS streaming subject: see tests/stan_subscribe.rs ``` rust use log::info; use futures::StreamExt; use ratsio::{RatsioError, StanClient, StanOptions};

pub fn loggersetup() { use log::LevelFilter; use std::io::Write; use envlogger::Builder;

let _ = Builder::new()
    .format(|buf, record| {
        writeln!(buf,
                 "[{}] - {}",
                 record.level(),
                 record.args()
        )
    })
    .filter(None, LevelFilter::Trace)
    .try_init();

}

[tokio::main]

async fn main() -> Result<(), RatsioError> { loggersetup(); // Create stan options let clientid = "test1".tostring(); let opts = StanOptions::withoptions("localhost:4222", "test-cluster", &clientid[..]); //Create STAN client let stanclient = StanClient::from_options(opts).await?;

//Subscribe to STAN subject 'foo'
let (sid, mut subscription) = stan_client.subscribe("foo", None, None).await?;
tokio::spawn(async move {
    while let Some(message) = subscription.next().await {
        info!(" << 1 >> got stan message --- {:?}\n\t{:?}", &message,
              String::from_utf8_lossy(message.payload.as_ref()));
    }
    info!(" ----- the subscription loop is done ---- ")
});

//Publish some mesesages to 'foo', use 'cargo run --example stan_publish foo "hi there"' 
use std::{thread, time};
thread::sleep(time::Duration::from_secs(60));

//Unsubscribe 
let _ = stan_client.un_subscribe(&sid).await;
thread::sleep(time::Duration::from_secs(10));
info!(" ---- done --- ");
Ok(())

}
```

Important Changes

Version 0.2

All version 0.2.* related information is available here Version 0.2.*.

Version 0.3.0-alpha.1

Breaking API changes from 0.2 This is the first async/await compatible version, it's not production ready yet, still work in progress. See examples in examples/ folder.

Version 0.3.0-alpha.3

This is the first async/await that works, but still missing features from version 0.2.

Version 0.3.0-alpha.4

Replaced std::sync::RwLock with futures::lock::Mutex for + Send + Sync capabilities.

Version 0.3.0-alpha.5

Replaced failure::* with thiserror crate.

Version 0.3.0-alpha.6

Merged pull requests #12 and #13

Version 0.4.0-alpha.1

Upgrade to use tokio 1.0 Merged pull requests #21

Version 0.4.0

Comment unused Error, because if update version thiserror, drop conflict implementation Merged pull requests #23

Contact

For bug reports, patches, feature requests or other messages, please send a mail to michael@zulzi.com

License

This project is licensed under the MIT License.