Lapin wrapper that encapsulates the use of connections/channels and provides some helpful methods making it easier to use and less error prone.
```rust use amqpmanager::prelude::*; use futures::FutureExt; use tokioamqp::LapinTokioExt;
async fn main() { let poolmanager = AmqpConnectionManager::new( "amqp://admin:admin@192.168.2.169:5672//".tostring(), ConnectionProperties::default().withtokio(), ); let pool = mobc::Pool::builder().maxopen(2).build(poolmanager); let amqpmanager = AmqpManager::new(pool).expect("Should create AmqpManager instance"); let txsession = amqpmanager .getsessionwithconfirmselect() .await .expect("Should create AmqpSession instance");
let create_queue_op = CreateQueue {
options: QueueDeclareOptions {
auto_delete: true,
..Default::default()
},
..Default::default()
};
let queue = tx_session.create_queue(create_queue_op.clone()).await.expect("create_queue");
let confirmation = tx_session
.publish_to_routing_key(PublishToRoutingKey {
routing_key: queue.name().as_str(),
payload: "Hello World".as_bytes(),
..Default::default()
})
.await
.expect("publish_to_queue");
assert!(confirmation.is_ack());
let rx_session = amqp_manager.get_session().await.unwrap();
rx_session
.create_consumer_with_delegate(
CreateConsumer {
queue_name: queue.name().as_str(),
..Default::default()
},
move |delivery: DeliveryResult| async {
if let Ok(Some((channel, delivery))) = delivery {
let payload = std::str::from_utf8(&delivery.data).unwrap();
assert_eq!(payload, "Hello World");
channel
.basic_ack(delivery.delivery_tag, BasicAckOptions::default())
.map(|_| ())
.await;
}
},
)
.await
.expect("create_consumer");
let queue = tx_session.create_queue(create_queue_op.clone()).await.expect("create_queue");
assert_eq!(queue.message_count(), 0, "Messages has been consumed");
} ```
The crate is tested on ubuntu-latest
against the following rust versions: nightly, beta, stable and 1.45.0.
It is possible that it works with older versions as well but this is not tested.
Please see the details of the lapin crate about its requirements.