Postgres Message Queue

A lightweight message queue extension for Postgres. Provides similar experience to AWS SQS and Redis Simple Message Queue, but on Postgres.

Installation

TODO docker run ...

Python Examples

Connect to postgres

```python import json import pprint

from sqlalchemy import create_engine, text

engine = createengine("postgresql://postgres:postrgres@localhost:28814/pgxpgmq") ```

Create and list queues

python with engine.connect() as con: # create a queue created = con.execute(text( "select * from pgmq_create('myqueue');")) # list queues list_queues = con.execute(text( "select * from pgmq_list_queues()")) column_names = list_queues.keys() rows = list_queues.fetchall() print("### Queues ###") for row in rows: pprint.pprint(dict(zip(column_names, row))) '### Queues ###' {'created_at': datetime.datetime(2023, 2, 7, 2, 5, 39, 946356, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800))), 'queue_name': 'myqueue'}

Send a message to the queue

python with engine.connect() as con: # send a message msg = json.dumps({"yolo": 42}) msg_id = con.execute(text(f"select * from pgmq_send('x', '{msg}') as msg_id;")) column_names = msg_id.keys() rows = msg_id.fetchall() print("### Message ID ###") for row in rows: pprint.pprint(dict(zip(column_names, row))) '### Message ID ###' {'msg_id': 1}

Read a message from the queue

python with engine.connect() as con: # read a message, make it unavailable to be read again for 5 seconds read = con.execute(text("select * from pgmq_read('x', 5);")) column_names = read.keys() rows = read.fetchall() print("### Read Message ###") for row in rows: pprint.pprint(dict(zip(column_names, row))) '### Read Message ###' {'enqueued_at': datetime.datetime(2023, 2, 7, 2, 51, 50, 468837, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800))), 'message': {'myqueue': 42}, 'msg_id': 1, 'read_ct': 1, 'vt': datetime.datetime(2023, 2, 7, 16, 9, 4, 826669, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800)))}

Delete a message from the queue

python with engine.connect() as con: # delete a message deleted = con.execute(text("select pgmq_delete('x', 1);")) column_names = deleted.keys() rows = deleted.fetchall() print("### Message Deleted ###") for row in rows: pprint.pprint(dict(zip(column_names, row))) '### Message Deleted ###' {'pgmq_delete': True}

SQL Examples

sql CREATE EXTENSION pgmq;

Creating a queue

```sql SELECT pgmqcreate('myqueue');

pgmq_create

```

Send a message

```sql pgmq=# SELECT * from pgmqsend('myqueue', '{"foo": "bar"}');

pgmq_send

        1

```

Read a message

Reads a single message from the queue. Make it invisible for 30 seconds. ```sql pgmq=# SELECT * from pgmqread('myqueue', 30);

msgid | readct | vt | enqueued_at | message
--------+---------+-------------------------------+-------------------------------+--------------- 1 | 2 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar"} ```

If the queue is empty, or if all messages are currently invisible, no rows will be returned.

```sql pgxpgmq=# SELECT * from pgmqread('myqueue', 30); msgid | readct | vt | enqueuedat | message --------+---------+----+-------------+---------

```

Pop a message

Read a message and immediately delete it from the queue. Returns None if the queue is empty. ```sql pgmq=# SELECT * from pgmqpop('myqueue');

msgid | readct | vt | enqueued_at | message
--------+---------+-------------------------------+-------------------------------+--------------- 1 | 2 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar"} ```

Archive a message

Archiving a message removes it from the queue, and inserts it to the archive table. TODO:

Delete a message

Delete a message with id 1 from queue named my_queue. ```sql pgmq=# select pgmqdelete('myqueue', 1);

pgmq_delete

t ```

Development

Setup pgx.

bash cargo install --locked cargo-pgx cargo pgx init

Then, clone this repo and change into this directory.

bash git clone git@github.com:CoreDB-io/coredb.git cd coredb/extensions/pgmq/

Run the dev environment

bash cargo pgx run pg14