RabbitMQ support for the mobc connection pool
```rust use mobc::Pool; use mobclapin::RMQConnectionManager; use tokioamqp::; use futures::StreamExt; use std::time::Duration; use lapin::{ options::, types::FieldTable, BasicProperties, publisher_confirm::Confirmation, ConnectionProperties, };
const PAYLOAD: &[u8;13] = b"Hello, World!"; const QUEUE_NAME: &str = "test";
async fn main() {
let addr = "amqp://rmq:rmq@127.0.0.1:5672/%2f";
let manager =RMQConnectionManager::new(addr.toowned(), ConnectionProperties::default().withtokio());
let pool = Pool::
let conn = pool.get().await.unwrap();
let channel = conn.create_channel().await.unwrap();
let _ = channel
.queue_declare(
QUEUE_NAME,
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await.unwrap();
// send messages to the queue
println!("spawning senders...");
for i in 0..50 {
let send_pool = pool.clone();
let send_props = BasicProperties::default().with_kind(format!("Sender: {}", i).into());
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(200));
loop {
interval.tick().await;
let send_conn = send_pool.get().await.unwrap();
let send_channel = send_conn.create_channel().await.unwrap();
let confirm = send_channel
.basic_publish(
"",
QUEUE_NAME,
BasicPublishOptions::default(),
PAYLOAD.to_vec(),
send_props.clone(),
)
.await.unwrap()
.await.unwrap();
assert_eq!(confirm, Confirmation::NotRequested);
}
});
}
// listen for incoming messages from the queue
let mut consumer = channel
.basic_consume(
QUEUE_NAME,
"my_consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await.unwrap();
println!("listening to messages...");
while let Some(delivery) = consumer.next().await {
let (channel, delivery) = delivery.expect("error in consumer");
println!("incoming message from: {:?}", delivery.properties.kind());
channel
.basic_ack(delivery.delivery_tag, BasicAckOptions::default())
.await
.expect("ack");
}
} ```