fang

Crates.io docs page test style

Fang

Background task processing library for Rust. It uses Postgres DB as a task queue.

Features

Here are some of the fang's key features:

Installation

  1. Add this to your Cargo.toml

the Blocking feature

toml [dependencies] fang = { version = "0.10" , features = ["blocking"], default-features = false }

the Asynk feature

toml [dependencies] fang = { version = "0.10" , features = ["asynk"], default-features = false }

Both features

toml fang = { version = "0.10" }

Supports rustc 1.62+

  1. Create the fang_tasks table in the Postgres database. The migration can be found in the migrations directory.

Usage

Defining a task

Blocking feature

Every task should implement the fang::Runnable trait which is used by fang to execute it.

```rust use fang::Error; use fang::Runnable; use fang::typetag; use fang::PgConnection; use fang::serde::{Deserialize, Serialize};

[derive(Serialize, Deserialize)]

[serde(crate = "fang::serde")]

struct MyTask { pub number: u16, }

[typetag::serde]

impl Runnable for MyTask { fn run(&self, _queue: &dyn Queueable) -> Result<(), Error> { println!("the number is {}", self.number);

    Ok(())
}

// If `uniq` is set to true and the task is already in the storage, it won't be inserted again
// The existing record will be returned for for any insertions operaiton
fn uniq(&self) -> bool {
  true
}

// This will be useful if you want to filter tasks.
// the default value is `common`
fn task_type(&self) -> String {
  "my_task".to_string()
}

// This will be useful if you would like to schedule tasks.
// default value is None (the task is not scheduled, it's just executed as soon as it's inserted)
fn cron(&self) -> Option<Scheduled> {
    let expression = "0/20 * * * Aug-Sep * 2022/1";
    Some(Scheduled::CronPattern(expression.to_string()))
}

// the maximum number of retries. Set it to 0 to make it not retriable
// the default value is 20
fn max_retries(&self) -> i32 {
  20
}

// backoff mode for retries
fn backoff(&self, attempt: u32) -> u32 {
  u32::pow(2, attempt)
}

} ```

As you can see from the example above, the trait implementation has #[typetag::serde] attribute which is used to deserialize the task.

The second parameter of the run function is a struct that implements fang::Queueable. You can re-use it to manipulate the task queue, for example, to add a new job during the current job's execution. If you don't need it, just ignore it.

Asynk feature

Every task should implement fang::AsyncRunnable trait which is used by fang to execute it.

Be careful not to call two implementations of the AsyncRunnable trait with the same name, because it will cause a failure in the typetag crate. ```rust use fang::AsyncRunnable; use fang::asynk::asyncqueue::AsyncQueueable; use fang::serde::{Deserialize, Serialize}; use fang::asynctrait;

[derive(Serialize, Deserialize)]

[serde(crate = "fang::serde")]

struct AsyncTask { pub number: u16, }

[typetag::serde]

[async_trait]

impl AsyncRunnable for AsyncTask { async fn run(&self, queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { Ok(()) } // this func is optional // Default tasktype is common fn tasktype(&self) -> String { "my-task-type".tostring() }

// If `uniq` is set to true and the task is already in the storage, it won't be inserted again
// The existing record will be returned for for any insertions operaiton
fn uniq(&self) -> bool {
  true
}

// This will be useful if you would like to schedule tasks.
// default value is None (the task is not scheduled, it's just executed as soon as it's inserted)
fn cron(&self) -> Option<Scheduled> {
    let expression = "0/20 * * * Aug-Sep * 2022/1";
    Some(Scheduled::CronPattern(expression.to_string()))
}

// the maximum number of retries. Set it to 0 to make it not retriable
// the default value is 20
fn max_retries(&self) -> i32 {
  20
}

// backoff mode for retries
fn backoff(&self, attempt: u32) -> u32 {
  u32::pow(2, attempt)
}

} ```

In both modules, tasks can be scheduled to be executed once. Use Scheduled::ScheduleOnce enum variant.

Datetimes and cron patterns are interpreted in the UTC timezone. So you should introduce the offset to schedule in a different timezone.

Example:

If your timezone is UTC + 2 and you want to schedule at 11:00:

rust let expression = "0 0 9 * * * *";

Enqueuing a task

the Blocking feature

To enqueue a task use Queue::enqueue_task

```rust use fang::Queue;

// create a r2d2 pool

// create a fang queue

let queue = Queue::builder().connection_pool(pool).build();

let taskinserted = queue.inserttask(&MyTask::new(1)).unwrap();

```

the Asynk feature

To enqueue a task use AsyncQueueable::insert_task.

For Postgres backend. ```rust use fang::asynk::async_queue::AsyncQueue; use fang::NoTls; use fang::AsyncRunnable;

// Create an AsyncQueue let maxpoolsize: u32 = 2;

let mut queue = AsyncQueue::builder() // Postgres database url .uri("postgres://postgres:postgres@localhost/fang") // Max number of connections that are allowed .maxpoolsize(maxpoolsize) .build();

// Always connect first in order to perform any operation queue.connect(NoTls).await.unwrap();

``` As an easy example, we are using NoTls type. If for some reason you would like to encrypt Postgres requests, you can use openssl or native-tls.

rust // AsyncTask from the first example let task = AsyncTask { 8 }; let task_returned = queue .insert_task(&task as &dyn AsyncRunnable) .await .unwrap();

Starting workers

the Blocking feature

Every worker runs in a separate thread. In case of panic, they are always restarted.

Use WorkerPool to start workers. Use WorkerPool::builder to create your worker pool and run tasks.

```rust use fang::WorkerPool; use fang::Queue;

// create a Queue

let mut workerpool = WorkerPool::::builder() .queue(queue) .numberofworkers(3u32) // if you want to run tasks of the specific kind .tasktype("mytask_type") .build();

worker_pool.start(); ```

the Asynk feature

Every worker runs in a separate tokio task. In case of panic, they are always restarted. Use AsyncWorkerPool to start workers.

```rust use fang::asynk::asyncworkerpool::AsyncWorkerPool;

// Need to create a queue // Also insert some tasks

let mut pool: AsyncWorkerPool> = AsyncWorkerPool::builder() .numberofworkers(maxpoolsize) .queue(queue.clone()) // if you want to run tasks of the specific kind .tasktype("mytask_type") .build();

pool.start().await; ```

Check out:

Configuration

Blocking feature

Just use TypeBuilder for WorkerPool.

Asynk feature

Just use TypeBuilder for AsyncWorkerPool.

Configuring the type of workers

Configuring retention mode

By default, all successfully finished tasks are removed from the DB, failed tasks aren't.

There are three retention modes you can use:

rust pub enum RetentionMode { KeepAll, // doesn't remove tasks RemoveAll, // removes all tasks RemoveFinished, // default value }

Set retention mode with worker pools TypeBuilder in both modules.

Configuring sleep values

Blocking feature

You can use use SleepParams to configure sleep values:

rust pub struct SleepParams { pub sleep_period: Duration, // default value is 5 seconds pub max_sleep_period: Duration, // default value is 15 seconds pub min_sleep_period: Duration, // default value is 5 seconds pub sleep_step: Duration, // default value is 5 seconds }

If there are no tasks in the DB, a worker sleeps for sleep_period and each time this value increases by sleep_step until it reaches max_sleep_period. min_sleep_period is the initial value for sleep_period. All values are in seconds.

Use set_sleep_params to set it: rust let sleep_params = SleepParams { sleep_period: Duration::from_secs(2), max_sleep_period: Duration::from_secs(6), min_sleep_period: Duration::from_secs(2), sleep_step: Duration::from_secs(1), };

Set sleep params with worker pools TypeBuilder in both modules.

Contributing

  1. Fork it!
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create a new Pull Request

Running tests locally

Authors