A worker threadpool used to execute a number of jabs atop stateful workers in parallel. It spawns a specified number of worker threads and replenishes the pool if any worker threads panic.
A single Worker
runs in its own thread, to be implemented according to the trait:
```rust pub trait Worker { type Input: Send; type Output: Send;
fn new() -> Self;
fn execute(&mut self, Self::Input) -> Self::Output;
} ```
This crate provides Pool<W> where W: Worker
. With a pool, there are four
primary functions of interest:
- Pool::<MyWorker>::new(n_threads)
creates a new pool for a particular Worker
.
- pool.execute(inp)
non-blocking executes the worker and ignores the return value.
- pool.execute_to(tx, inp)
non-blocking executes the worker and sends return value to
the given Sender.
- pool.join()
blocking waits for all tasks (from execute
and
execute_to
) to complete.
A worker is provided in workerpool::thunk
, a stateless ThunkWorker<T>
.
It executes on inputs of Thunk<T>
, effectively argumentless functions that
are Sized + Send
. These thunks are creates by wrapping functions which
return T
with Thunk::of
.
```rust extern crate workerpool;
use workerpool::Pool; use workerpool::thunk::{Thunk, ThunkWorker}; use std::sync::mpsc::channel;
fn main() {
let nworkers = 4;
let njobs = 8;
let pool = Pool::
let (tx, rx) = channel();
for _ in 0..n_jobs {
pool.execute_to(tx.clone(), Thunk::of(|| 1i32));
}
assert_eq!(8, rx.iter().take(n_jobs).fold(0, |a, b| a + b));
} ```
For stateful workers, you have to implement Worker
yourself.
Suppose there's a line-delimited process, such as cat
or tr
, which you'd
like running on many threads for use in a pool-like manner. You may create
and use a worker, with maintained state of the stdin/stdout for the process,
as follows:
```rust extern crate workerpool;
use workerpool::{Worker, Pool}; use std::process::{Command, ChildStdin, ChildStdout, Stdio}; use std::io::prelude::*; use std::io::{self, BufReader}; use std::sync::mpsc::channel;
struct LineDelimitedProcess {
stdin: ChildStdin,
stdout: BufReader
fn new() -> Self {
let child = Command::new("cat")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()
.unwrap();
Self {
stdin: child.stdin.unwrap(),
stdout: BufReader::new(child.stdout.unwrap()),
}
}
fn execute(&mut self, inp: Self::Input) -> Self::Output {
self.stdin.write_all(&*inp)?;
self.stdin.write_all(b"\n")?;
self.stdin.flush()?;
let mut s = String::new();
self.stdout.read_line(&mut s)?;
s.pop(); // exclude newline
Ok(s)
}
}
fn main() {
let nworkers = 4;
let njobs = 8;
let pool = Pool::
let (tx, rx) = channel();
for i in 0..n_jobs {
let inp = Box::new([97 + i]);
pool.execute_to(tx.clone(), inp);
}
// output is a permutation of "abcdefgh"
let mut output = rx.iter()
.take(n_jobs as usize)
.fold(String::new(), |mut a, b| {
a.push_str(&b.unwrap());
a
})
.into_bytes();
output.sort();
assert_eq!(output, b"abcdefgh");
} ```
This work is derivative of threadpool.
Licensed under either of
at your option.
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.