This crate bridges Tokio and ZeroMQ to allow for ZeroMQ in the async world.
Currently a WIP
There are two sets of examples, publish/subscribe
and request/response
.
Bring up two terminals and run either:
```sh cargo run --example publish
cargo run --example subcribe ```
Or:
```sh cargo run --example request
cargo run --example response ```
Usage is made to be simple, but opinionated. See the examples for working code, but in general, you need to import tokio
and tmq::*
A request is a Stream
takes an input stream of Messages (using the with
function), sends them to a response socket, and then returns the messages as a stream.
```rust let request = request(&Context::new()) .connect("tcp://127.0.0.1:7899") .expect("Couldn't connect") .with(stream::iterok(vec!["Message1", "Message2", "Message3"].intoiter().map(|val| Message::from(val)))) .foreach(|val| { info!("Response: {}", val.asstr().unwrapor("")); Ok(()) }).maperr(|err| { error!("Error with request: {}", err); });
tokio::run(request); ```
A response socket is a Future
that receives messages, responds to them, and sends them back as per the Responder
implementation:
```rust let responder = respond(&Context::new()) .bind("tcp://127.0.0.1:7899") .expect("Couldn't bind address") .with(|msg: Message| { info!("Request: {}", msg.asstr().unwrapor("")); Ok(msg) }).map_err(|err| { error!("Error from server:{}", err); });
tokio::run(responder); ```
The with
function takes anything that implements the Responder
trait or a closure as above:
``rust
//You can use a struct to respond by implementing the
Responder` trait
pub struct EchoResponder {}
impl Responder for EchoResponder {
type Output = FutureResult
fn respond(&mut self, msg: zmq::Message) -> Self::Output {
return Ok(msg).into();
}
}
//Or you can use a free-floating function
fn echo(msg: zmq::Message) -> impl Future
A publish socket is a Sink
that takes values, and sends them to any subscribe sockets connected (note: ZeroMQ will drop messages if noone is connected).
```rust let mut i = 0;
let broadcast = Interval::newinterval(Duration::frommillis(1000)) .map(move |_| { i += 1; let message = format!("Broadcast #{}", i); Message::from(&message) });
let request = publish(&Context::new()) .bind("tcp://127.0.0.1:7899") .expect("Couldn't bind") .finish() .sendall(broadcast) .map(|| ()) .map_err(|e| { error!("Error publishing:{}", e); });
tokio::run(request); ```
a subscribe socket is a Stream
that reads in values from a publish socket. You specify the filter prefix using the subscribe
method, using ""
for all messages.
rust
let request = subscribe(&Context::new())
.connect("tcp://127.0.0.1:7899")
.expect("Couldn't connect")
.subscribe("")
.for_each(|val| {
info!("Subscribe: {}", val.as_str().unwrap_or(""));
Ok(())
}).map_err(|e| {
error!("Error Subscribing: {}", e);
});