A SQLite backed implementation of the job Queue.
NOTE: It is possible that a single job gets sent to two runners. This is due to SQLite lacking
row locking and BEGIN EXCLUSIVE TRANSACTION
not working well (it was very slow) for this use
case. This is only an issue at high concurrency, in which case you probably don't want to use
SQLite in the first place. In other words, this isn't "Exactly Once" kind of queue.
```sql CREATE TABLE IF NOT EXISTS adcqueue ( jid TEXT PRIMARY KEY, queue TEXT NOT NULL default 'default', jobtype TEXT not null, payload blob not null, retries int not null default 0, scheduledat INTEGER not null, startedat INTEGER, enqueued_at INTEGER not null default (strftime('%s', 'now')) );
CREATE TABLE IF NOT EXISTS adcdeadqueue ( jid TEXT PRIMARY KEY, queue TEXT NOT NULL, jobtype TEXT not null, payload blob not null, retries int not null, scheduledat INTEGER not null, startedat INTEGER not null, enqueuedat INTEGER not null, died_at INTEGER not null default (strftime('%s', 'now')) );
CREATE INDEX IF NOT EXISTS adcqueuejobs ON adcqueue ( scheduledat asc, startedat asc, queue, jobtype ); ```
Schema also included into this crate as SCHEMA_SQL
constant in this crate.
```rust
use aidedecampsqlite::{SqliteQueue, SCHEMASQL}; use aidedecamp::prelude::{Queue, JobHandler, JobRunner, RunnerRouter, Duration, Xid}; use asynctrait::asynctrait; use sqlx::SqlitePool;
struct MyJob;
impl JobHandler for MyJob {
type Payload = Vec
async fn handle(&self, jid: Xid, payload: Self::Payload) -> Result<(), Self::Error> {
// Do work here
Ok(())
}
fn name() -> &'static str {
"my_job"
}
}
async fn main() -> Result<(), Box
let pool = SqlitePool::connect(":memory:").await?;
// Setup schema, alternatively you can add schema to your migrations.
sqlx::query(SCHEMA_SQL).execute(&pool).await?;
let queue = SqliteQueue::with_pool(pool);
// Add job the queue to run next
let _jid = queue.schedule::<MyJob>(vec![1,2,3]).await?;
// First create a job processor and router
let router = {
let mut r = RunnerRouter::default();
r.add_job_handler(MyJob);
r
};
// Setup runner to at most 10 jobs concurrently
let mut runner = JobRunner::new(queue, router, 10);
// Poll the queue every second, this will block unless something went really wrong.
// runner.run(Duration::seconds(1)).await?; // Commented to avoid endlessly blocking this doctest.
Ok(())
} ```