workerpool icon indicating copy to clipboard operation
workerpool copied to clipboard

A simple thread pool for stateful parallel task execution

workerpool

CI crates.io docs.rs

A worker threadpool used to execute a number of jobs atop stateful workers in parallel. It spawns a specified number of worker threads and replenishes the pool if any worker threads panic.

A single Worker runs in its own thread, to be implemented according to the trait:

pub trait Worker : Default {
    type Input: Send;
    type Output: Send;

    fn execute(&mut self, Self::Input) -> Self::Output;
}

Usage

[dependencies]
workerpool = "1.2"

To use crossbeam's channels instead of std::sync::mpsc, enable the crossbeam feature:

[dependencies]
workerpool = { version = "1.2", features = ["crossbeam"] }

This crate provides Pool<W> where W: Worker. With a pool, there are four primary functions of interest:

  • Pool::<MyWorker>::new(n_threads) creates a new pool for a particular Worker.
  • pool.execute(inp) non-blocking executes the worker and ignores the return value.
  • pool.execute_to(tx, inp) non-blocking executes the worker and sends return value to the given Sender.
  • pool.join() blocking waits for all tasks (from execute and execute_to) to complete.

A worker is provided in workerpool::thunk, a stateless ThunkWorker<T>. It executes on inputs of Thunk<T>, effectively argumentless functions that are Sized + Send. These thunks are creates by wrapping functions which return T with Thunk::of.

use workerpool::Pool;
use workerpool::thunk::{Thunk, ThunkWorker};
use std::sync::mpsc::channel;

fn main() {
    let n_workers = 4;
    let n_jobs = 8;
    let pool = Pool::<ThunkWorker<i32>>::new(n_workers);
    
    let (tx, rx) = channel();
    for i in 0..n_jobs {
        pool.execute_to(tx.clone(), Thunk::of(move || i * i));
    }
    
    assert_eq!(140, rx.iter().take(n_jobs as usize).sum());
}

For stateful workers, you have to implement Worker yourself.

Suppose there's a line-delimited process, such as cat or tr, which you'd like running on many threads for use in a pool-like manner. You may create and use a worker, with maintained state of the stdin/stdout for the process, as follows:

use workerpool::{Worker, Pool};
use std::process::{Command, ChildStdin, ChildStdout, Stdio};
use std::io::prelude::*;
use std::io::{self, BufReader};
use std::sync::mpsc::channel;

struct LineDelimitedProcess {
    stdin: ChildStdin,
    stdout: BufReader<ChildStdout>,
}
impl Default for LineDelimitedProcess {
    fn default() -> Self {
        let child = Command::new("cat")
            .stdin(Stdio::piped())
            .stdout(Stdio::piped())
            .stderr(Stdio::inherit())
            .spawn()
            .unwrap();
        Self {
            stdin: child.stdin.unwrap(),
            stdout: BufReader::new(child.stdout.unwrap()),
        }
    }
}
impl Worker for LineDelimitedProcess {
    type Input = Box<[u8]>;
    type Output = io::Result<String>;

    fn execute(&mut self, inp: Self::Input) -> Self::Output {
        self.stdin.write_all(&*inp)?;
        self.stdin.write_all(b"\n")?;
        self.stdin.flush()?;
        let mut s = String::new();
        self.stdout.read_line(&mut s)?;
        s.pop(); // exclude newline
        Ok(s)
    }
}

fn main() {
    let n_workers = 4;
    let n_jobs = 8;
    let pool = Pool::<LineDelimitedProcess>::new(n_workers);
    
    let (tx, rx) = channel();
    for i in 0..n_jobs {
        let inp = Box::new([97 + i]);
        pool.execute_to(tx.clone(), inp);
    }
    
    // output is a permutation of "abcdefgh"
    let mut output = rx.iter()
        .take(n_jobs as usize)
        .fold(String::new(), |mut a, b| {
            a.push_str(&b.unwrap());
            a
        })
        .into_bytes();
    output.sort();
    assert_eq!(output, b"abcdefgh");
}

Similar libraries

License

This work is derivative of threadpool.

Licensed under either of

  • Apache License, Version 2.0, (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
  • MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.