Simple database message queue based on bonsaidb.
The project is highly influenced by sqlxmq.
Warning: This project is in early alpha and should not be used in production!
Import the project using:
```toml
bonsaimq = "0.1.0"
bonsaimq = { git = "https://github.com/FlixCoder/bonsaimq" } ```
Then you can use the message/job queue as follows:
CurrentJob
and return nothing. CurrentJob
allows interfacing the job to retrieve job input or complete the job etc.job_regristy!
needs to be use to create a job registry, which maps message names/types to the job handlers and allows spawning new jobs.Besides the following simple example, see the examples in the examples folder and take a look at the tests.
```rust use bonsaidb::local::{ config::{Builder, StorageConfiguration}, AsyncDatabase, }; use bonsaimq::{jobregistry, CurrentJob, JobRegister, JobRunner, MessageQueueSchema}; use coloreyre::Result;
/// Example job function. It receives a handle to the current job, which gives /// the ability to get the input payload, complete the job and more. async fn greet(mut job: CurrentJob) { // Load the JSON payload and make sure it is there. let name: String = job.payload_json().expect("input should be given").expect("deserializing"); println!("Hello {name}!"); job.complete().await.expect("access to DB"); }
// The JobRegistry provides a way to spawn new jobs and provides the interface // for the JobRunner to find the functions to execute for the jobs. job_registry!(JobRegistry, { Greetings: "greet" => greet, });
async fn main() -> Result<()> {
// Open a local database for this example.
let dbpath = "simple-doc-example.bonsaidb";
let db = AsyncDatabase::open::
// Start the job runner to execute jobs from the messages in the queue in the
// database.
let job_runner = JobRunner::new(db.clone()).run::<JobRegistry>();
// Spawn new jobs via a message on the database queue.
let job_id = JobRegistry::Greetings.builder().payload_json("cats")?.spawn(&db).await?;
// Wait for job to finish execution, polling every 100 ms.
bonsaimq::await_job(job_id, 100, &db).await?;
// Clean up.
job_runner.abort(); // Is done automatically on drop.
tokio::fs::remove_dir_all(db_path).await?;
Ok(())
} ```
This projects uses a bunch of clippy lints for higher code quality and style.
Install cargo-lints
using cargo install --git https://github.com/FlixCoder/cargo-lints
. The lints are defined in lints.toml
and can be checked by running cargo lints clippy --all-targets --workspace
.