A lightweight messaging queue for Rust, using Postgres as the backend. Inspired by the RSMQ project.
First, start any Postgres instance. It is the only external dependency.
bash
docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 postgres
Example of sending, receiving, and deleting messages from the queue. Typically applications sending messages will not be the same application reading the message.
```rust use pgmq::{Message, PGMQueue}; use serde::{Serialize, Deserialize}; use serde_json::Value;
async fn main() { // CREATE A QUEUE let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".toowned()).await.expect("failed to connect to postgres"); let myqueue = "myqueue".toowned(); queue.create(&myqueue).await.expect("Failed to create queue");
// SEND A `serde_json::Value` MESSAGE
let msg1 = serde_json::json!({
"foo": "bar"
});
let msg_id1: i64 = queue.send(&myqueue, &msg1).await.expect("Failed to enqueue message");
// SEND A STRUCT
#[derive(Serialize, Debug, Deserialize)]
struct MyMessage {
foo: String,
}
let msg2 = MyMessage {
foo: "bar".to_owned(),
};
let msg_id2: i64 = queue.send(&myqueue, &msg2).await.expect("Failed to enqueue message");
// READ A MESSAGE as `serde_json::Value`
let vt: i32 = 30;
let read_msg1: Message<Value> = queue.read::<Value>(&myqueue, Some(&vt)).await.unwrap().expect("no messages in the queue!");
assert_eq!(read_msg1.msg_id, msg_id1);
// READ A MESSAGE as a struct
let read_msg2: Message<MyMessage> = queue.read::<MyMessage>(&myqueue, Some(&vt)).await.unwrap().expect("no messages in the queue!");
assert_eq!(read_msg2.msg_id, msg_id2);
// DELETE THE MESSAGE WE SENT
let deleted = queue.delete(&myqueue, &read_msg1.msg_id).await.expect("Failed to delete message");
let deleted = queue.delete(&myqueue, &read_msg2.msg_id).await.expect("Failed to delete message");
// No messages present aftwe we've deleted all of them
let no_msg: Option<Message<Value>> = queue.read::<Value>(&myqueue, Some(&vt)).await.unwrap();
assert!(no_msg.is_none());
} ```
queue.send()
can be passed any type that implements serde::Serialize
. This means you can prepare your messages as JSON or as a struct.
Reading a message will make it invisible (unavailable for consumption) for the duration of the visibility timeout (vt). No messages are returned when the queue is empty or all messages are invisible.
Messages can be parsed as serde_json::Value or into a struct. queue.read()
returns an Result<Option<Message<T>>, PGMQError>
where T
is the type of the message on the queue. It returns an error when there is an issue parsing the message or if PGMQ is unable to reach postgres.
Note that when parsing into a struct
, the operation will return an error if
parsed as the type specified. For example, if the message expected is
MyMessage{foo: "bar"}
but{"hello": "world"}
is received, the application will panic.
Remove the message from the queue when you are done with it. You can either completely .delete()
, or .archive()
the message. Archived messages are deleted from the queue and inserted to the queue's archive table. Deleted messages are just deleted.
License: MIT