A lightweight message queue extension for Postgres. Provides similar experience to AWS SQS and Redis Simple Message Queue, but on Postgres.
TODO
docker run ...
```python import json import pprint
from sqlalchemy import create_engine, text
engine = createengine("postgresql://postgres:postrgres@localhost:28814/pgxpgmq") ```
python
with engine.connect() as con:
# create a queue
created = con.execute(text( "select * from pgmq_create('myqueue');"))
# list queues
list_queues = con.execute(text( "select * from pgmq_list_queues()"))
column_names = list_queues.keys()
rows = list_queues.fetchall()
print("### Queues ###")
for row in rows:
pprint.pprint(dict(zip(column_names, row)))
'### Queues ###'
{'created_at': datetime.datetime(2023, 2, 7, 2, 5, 39, 946356, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800))),
'queue_name': 'myqueue'}
python
with engine.connect() as con:
# send a message
msg = json.dumps({"yolo": 42})
msg_id = con.execute(text(f"select * from pgmq_send('x', '{msg}') as msg_id;"))
column_names = msg_id.keys()
rows = msg_id.fetchall()
print("### Message ID ###")
for row in rows:
pprint.pprint(dict(zip(column_names, row)))
'### Message ID ###'
{'msg_id': 1}
python
with engine.connect() as con:
# read a message, make it unavailable to be read again for 5 seconds
read = con.execute(text("select * from pgmq_read('x', 5);"))
column_names = read.keys()
rows = read.fetchall()
print("### Read Message ###")
for row in rows:
pprint.pprint(dict(zip(column_names, row)))
'### Read Message ###'
{'enqueued_at': datetime.datetime(2023, 2, 7, 2, 51, 50, 468837, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800))),
'message': {'myqueue': 42},
'msg_id': 1,
'read_ct': 1,
'vt': datetime.datetime(2023, 2, 7, 16, 9, 4, 826669, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800)))}
python
with engine.connect() as con:
# delete a message
deleted = con.execute(text("select pgmq_delete('x', 1);"))
column_names = deleted.keys()
rows = deleted.fetchall()
print("### Message Deleted ###")
for row in rows:
pprint.pprint(dict(zip(column_names, row)))
'### Message Deleted ###'
{'pgmq_delete': True}
sql
CREATE EXTENSION pgmq;
```sql SELECT pgmqcreate('myqueue');
```
```sql pgmq=# SELECT * from pgmqsend('myqueue', '{"foo": "bar"}');
1
```
Reads a single message from the queue. Make it invisible for 30 seconds. ```sql pgmq=# SELECT * from pgmqread('myqueue', 30);
msgid | readct | vt | enqueued_at | message
--------+---------+-------------------------------+-------------------------------+---------------
1 | 2 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar"}
```
If the queue is empty, or if all messages are currently invisible, no rows will be returned.
```sql pgxpgmq=# SELECT * from pgmqread('myqueue', 30); msgid | readct | vt | enqueuedat | message --------+---------+----+-------------+---------
```
Read a message and immediately delete it from the queue. Returns None
if the queue is empty.
```sql
pgmq=# SELECT * from pgmqpop('myqueue');
msgid | readct | vt | enqueued_at | message
--------+---------+-------------------------------+-------------------------------+---------------
1 | 2 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar"}
```
Archiving a message removes it from the queue, and inserts it to the archive table. TODO:
Delete a message with id 1
from queue named my_queue
.
```sql
pgmq=# select pgmqdelete('myqueue', 1);
t ```
Setup pgx
.
bash
cargo install --locked cargo-pgx
cargo pgx init
Then, clone this repo and change into this directory.
bash
git clone git@github.com:CoreDB-io/coredb.git
cd coredb/extensions/pgmq/
Run the dev environment
bash
cargo pgx run pg14