lapin-async

this library is meant for use in an event loop. The library exposes, through the Connection struct, a state machine you can drive through IO you manage.

Typically, your code would own the socket and buffers, and regularly pass the input and output buffers to the state machine so it receives messages and serializes new ones to send. You can then query the current state and see if it received new messages for the consumers.

Creating a new connection

Set up an AMQP 0.9.1 compliant server. For the purpose of this documentation, we'll assume it is listening on 127.0.0.1:5672.

Create the client socket and some buffers to move data:

```rust extern crate lapin_async as lapin;

use std::net::TcpStream; use lapin::connection::*; use lapin::buffer::Buffer;

fn main() { let mut stream = TcpStream::connect("127.0.0.1:5672").unwrap(); stream.set_nonblocking(true);

let capacity = 8192; let mut sendbuffer = Buffer::withcapacity(capacity as usize); let mut receivebuffer = Buffer::withcapacity(capacity as usize);

} ```

Now, we can create the Connection object:

rust let mut conn: Connection = Connection::new(); loop { match conn.run(&mut stream, &mut send_buffer, &mut receive_buffer) { Err(e) => panic!("could not connect: {:?}", e), Ok(ConnectionState::Connected) => break, Ok(state) => println!("now at state {:?}, continue", state), } thread::sleep(time::Duration::from_millis(100)); } println!("CONNECTED");

The run method will repeatedly call 3 other methods:

Calling parse and write_to_stream is how you handle the plumbing for the state machine. Your code should then react to the state changes returned by those functions, or to the specific state of channels and consumers.

In the current case, we wait until the state machine gets to the ConnectionState::Connected state.

Creating a channel

```rust let channelid: u16 = conn.createchannel(); conn.channelopen(channela, "".tostring()).expect("channelopen");

// update state here until: assert!(conn.checkstate(channelid, ChannelState::Connected).unwrap_or(false)); ```

Creating a queue

```rust //create the "hello" queue let requestid: u16 = conn.queuedeclare(channelid, 0, "hello".tostring(), false, false, false, false, false, HashMap::new()).unwrap();

// update state here until: assert!(conn.isfinished(requestid)); ```

Publishing a message

```rust conn.basicpublish(channelid, 0, "".tostring(), "hello".tostring(), false, false).expect("basicpublish"); let payload = b"Hello world!"; conn.sendcontentframes(channela, 60, payload, basic::Properties::default()));

// update state ```

Creating a Consumer

```rust //create the "hello" queue let requestid: u16 = conn.basicconsume(channelid, 0, "hello".tostring(), "myconsumer".tostring(), false, true, false, false, HashMap::new()).expect("basic_consume");

// update state here until: assert!(conn.isfinished(requestid));

// get the next message if let Ok(message) = conn.nextmessage(channelid, "hello", "my_consumer") { // handle message } ```