CI crates.io docs

mobc-lapin

RabbitMQ support for the mobc connection pool

Example

```rust use mobc::Pool; use mobclapin::RMQConnectionManager; use tokioamqp::; use futures::StreamExt; use std::time::Duration; use lapin::{ options::, types::FieldTable, BasicProperties, publisher_confirm::Confirmation, ConnectionProperties, };

const PAYLOAD: &[u8;13] = b"Hello, World!"; const QUEUE_NAME: &str = "test";

[tokio::main]

async fn main() { let addr = "amqp://rmq:rmq@127.0.0.1:5672/%2f"; let manager =RMQConnectionManager::new(addr.toowned(), ConnectionProperties::default().withtokio()); let pool = Pool::::builder() .max_open(5) .build(manager);

let conn = pool.get().await.unwrap();
let channel = conn.create_channel().await.unwrap();
let _ = channel
    .queue_declare(
        QUEUE_NAME,
        QueueDeclareOptions::default(),
        FieldTable::default(),
    )
    .await.unwrap();

// send messages to the queue
println!("spawning senders...");
for i in 0..50 {
    let send_pool = pool.clone();
    let send_props = BasicProperties::default().with_kind(format!("Sender: {}", i).into());
    tokio::spawn(async move {
        let mut interval = tokio::time::interval(Duration::from_millis(200));
        loop {
            interval.tick().await;
            let send_conn = send_pool.get().await.unwrap();
            let send_channel = send_conn.create_channel().await.unwrap();
            let confirm = send_channel
                .basic_publish(
                    "",
                    QUEUE_NAME,
                    BasicPublishOptions::default(),
                    PAYLOAD.to_vec(),
                    send_props.clone(),
                )
                .await.unwrap()
                .await.unwrap();
            assert_eq!(confirm, Confirmation::NotRequested);
        }

    });
}

// listen for incoming messages from the queue
let mut consumer = channel
    .basic_consume(
        QUEUE_NAME,
        "my_consumer",
        BasicConsumeOptions::default(),
        FieldTable::default(),
    )
    .await.unwrap();

println!("listening to messages...");
while let Some(delivery) = consumer.next().await {
    let (channel, delivery) = delivery.expect("error in consumer");
    println!("incoming message from: {:?}", delivery.properties.kind());
    channel
        .basic_ack(delivery.delivery_tag, BasicAckOptions::default())
        .await
        .expect("ack");
    }

} ```