Background job processing library for Rust. It uses Postgres DB as a task queue.
toml
[dependencies]
fang = "0.3.2"
fang_tasks
table in the Postgres database. The migration can be found in the migrations directory.Every job should implement fang::Runnable
trait which is used by fang
to execute it.
```rust use fang::Error; use fang::Runnable; use fang::{Deserialize, Serialize}; use fang::typetag;
struct Job { pub number: u16, }
impl Runnable for Job { fn run(&self) -> Result<(), Error> { println!("the number is {}", self.number);
Ok(())
}
} ```
As you can see from the example above, the trait implementation has #[typetag::serde]
attribute which is used to deserialize the job.
To enqueue a job use Postgres::enqueue_task
```rust use fang::Postgres;
...
Postgres::enqueue_task(&Job { number: 10 }).unwrap();
```
The example above creates a new postgres connection on every call. If you want to reuse the same postgres connection to enqueue several jobs use Postgres struct instance:
```rust let postgres = Postgres::new();
for id in &unsyncedfeedids { postgres.pushtask(&SyncFeedJob { feedid: *id }).unwrap(); }
```
Every worker runs in a separate thread. In case of panic, they are always restarted.
Use WorkerPool
to start workers. WorkerPool::new
accepts one parameter - the number of workers.
```rust use fang::WorkerPool;
WorkerPool::new(10).start(); ```
To configure workers, instead of WorkerPool::new
which uses default values, use WorkerPool.new_with_params
. It accepts two parameters - the number of workers and WorkerParams
struct.
You can start workers for a specific types of tasks. These workers will be executing only tasks of the specified type.
Add task_type
method to the Runnable
trait implementation:
```rust ...
impl Runnable for Job { fn run(&self) -> Result<(), Error> { println!("the number is {}", self.number);
Ok(())
}
fn task_type(&self) -> String {
"number".to_string()
}
} ```
Set task_type
to the WorkerParamas
:
```rust let mut workerparams = WorkerParams::new(); workerparams.settasktype("number".to_string());
WorkerPool::newwithparams(10, worker_params).start(); ```
Without setting task_type
workers will be executing any type of task.
By default, all successfully finished tasks are removed from the DB, failed tasks aren't.
There are three retention modes you can use:
rust
pub enum RetentionMode {
KeepAll, \\ doesn't remove tasks
RemoveAll, \\ removes all tasks
RemoveFinished, \\ default value
}
Set retention mode with set_retention_mode
:
```rust let mut workerparams = WorkerParams::new(); workerparams.setretentionmode(RetentionMode::RemoveAll);
WorkerPool::newwithparams(10, worker_params).start(); ```
You can use use SleepParams
to confugure sleep values:
rust
pub struct SleepParams {
pub sleep_period: u64, \\ default value is 5
pub max_sleep_period: u64, \\ default value is 15
pub min_sleep_period: u64, \\ default value is 5
pub sleep_step: u64, \\ default value is 5
}p
If there are no tasks in the DB, a worker sleeps for sleep_period
and each time this value increases by sleep_step
until it reaches max_sleep_period
. min_sleep_period
is the initial value for sleep_period
. All values are in seconds.
Use set_sleep_params
to set it:
```rust
let sleepparams = SleepParams {
sleepperiod: 2,
maxsleepperiod: 6,
minsleepperiod: 2,
sleepstep: 1,
};
let mut workerparams = WorkerParams::new();
workerparams.setsleepparams(sleepparams);
WorkerPool::newwithparams(10, worker_params).start(); ```
Fang can add tasks to fang_tasks
periodically. To use this feature first run the migration with fang_periodic_tasks
table.
Usage example:
```rust use fang::Scheduler; use fang::Postgres;
let postgres = Postgres::new();
postgres .pushperiodictask(&SyncJob::default(), 120) .unwrap();
postgres .pushperiodictask(&DeliverJob::default(), 60) .unwrap();
Scheduler::start(10, 5); ```
In the example above, push_periodic_task
is used to save the specified task to the fang_periodic_tasks
table which will be enqueued (saved to fang_tasks
table) every specied number of seconds.
Scheduler::start(10, 5)
starts scheduler. It accepts two parameters:
- Db check period in seconds
- Acceptable error limit in seconds - |currenttime - scheduledtime| < error
git checkout -b my-new-feature
)git commit -am 'Add some feature'
)git push origin my-new-feature
)Ayrat Badykov (@ayrat555)