this library is meant for use in an event loop. The library exposes, through the Connection struct, a state machine you can drive through IO you manage.
Typically, your code would own the socket and buffers, and regularly pass the input and output buffers to the state machine so it receives messages and serializes new ones to send. You can then query the current state and see if it received new messages for the consumers.
```rust use envlogger; use lapinasync as lapin; use crate::lapin::api::ChannelState; use crate::lapin::buffer::Buffer; use crate::lapin::connection::*; use crate::lapin::consumer::ConsumerSubscriber; use crate::lapin::channel::BasicProperties; use crate::lapin::message::Delivery; use crate::lapin::types::FieldTable;
use std::{net::TcpStream, thread, time};
fn main() { env_logger::init();
/* Open TCP connection */ let mut stream = TcpStream::connect("127.0.0.1:5672").unwrap(); stream.set_nonblocking(true).unwrap();
/* Configure AMQP connection */ let capacity = 8192; let mut sendbuffer = Buffer::withcapacity(capacity as usize); let mut receivebuffer = Buffer::withcapacity(capacity as usize); let mut conn: Connection = Connection::new(); conn.setframemax(capacity);
/* Connect tp RabbitMQ server */ asserteq!(conn.connect().unwrap(), ConnectionState::Connecting(ConnectingState::SentProtocolHeader)); loop { match conn.run(&mut stream, &mut sendbuffer, &mut receivebuffer) { Err(e) => panic!("could not connect: {:?}", e), Ok(ConnectionState::Connected) => break, Ok(state) => println!("now at state {:?}, continue", state), } thread::sleep(time::Duration::frommillis(100)); } println!("CONNECTED");
/* Adapt our buffer after negocation with the server */ let framemax = conn.configuration.framemax; if framemax > capacity { sendbuffer.grow(framemax as usize); receivebuffer.grow(frame_max as usize); }
/* Create and open a channel */ let channelid = conn.createchannel().unwrap(); conn.channelopen(channelid, "".tostring()).expect("channelopen"); conn.run(&mut stream, &mut sendbuffer, &mut receivebuffer).unwrap(); thread::sleep(time::Duration::frommillis(100)); conn.run(&mut stream, &mut sendbuffer, &mut receivebuffer).unwrap(); assert!(conn.checkstate(channelid, ChannelState::Connected).maperr(|e| println!("{:?}", e)).is_ok());
/* Declaire the "hellp" queue */ let requestid = conn.queuedeclare(channelid, 0, "hello".tostring(), false, false, false, false, false, FieldTable::default()).unwrap(); conn.run(&mut stream, &mut sendbuffer, &mut receivebuffer).unwrap(); thread::sleep(time::Duration::frommillis(100)); conn.run(&mut stream, &mut sendbuffer, &mut receivebuffer).unwrap(); assert!(conn.isfinished(requestid).unwrapor(false));
/* Publish "Hellow world!" to the "hello" queue */ conn.basicpublish(channelid, 0, "".tostring(), "hello".tostring(), false, false).expect("basicpublish"); let payload = b"Hello world!"; conn.sendcontentframes(channelid, 60, payload, BasicProperties::default()); conn.run(&mut stream, &mut sendbuffer, &mut receivebuffer).unwrap(); thread::sleep(time::Duration::frommillis(100)); conn.run(&mut stream, &mut sendbuffer, &mut receive_buffer).unwrap();
/* Consumer the messages from the "hello" queue using an instance of Subscriber */ let requestid = conn.basicconsume(channelid, 0, "hello".tostring(), "myconsumer".tostring(), false, true, false, false, FieldTable::default(), Box::new(Subscriber)).expect("basicconsume"); conn.run(&mut stream, &mut sendbuffer, &mut receivebuffer).unwrap(); thread::sleep(time::Duration::frommillis(100)); conn.run(&mut stream, &mut sendbuffer, &mut receivebuffer).unwrap(); assert!(conn.isfinished(requestid).unwrap_or(false)); }
struct Subscriber;
impl ConsumerSubscriber for Subscriber { fn new_delivery(&mut self, _delivery: Delivery) { // handle message } }
```