threadpool
threadpool copied to clipboard
Go Thread Pool
ThreadPool
Go / Golang thread-pool library that supports nested job queuing. The general programming pattern is described in the following schematic:
// create a new thread pool with 5 working threads and
// a queue buffer of 100 (in addition to this thread, 4
// more threads will be launched that start reading
// from the job queue)
pool := threadpool.New(5, 100)
// allocate some memory for each thread
data := make([]ThreadData, pool.NumberOfThreads())
// jobs are always grouped, get a new group index
g1 := pool.NewJobGroup()
// add a new job to group g1, if there is only one thread in the pool
// (i.e. if pool == threadpool.ThreadPool{}), the main thread will process this
// job immediately
if err := pool.AddJob(g1, func(pool threadpool.ThreadPool, erf func() error) error {
// check if there was an error in one of the other tasks
if erf() != nil {
return nil
}
// notice that this function receives as argument another
// ThreadPool structure, which hides the old pool variable
// in the surrounding scope and carries the thread id of
// the current process
// access global data in a thread-safe way
data := data[pool.GetThreadId()]
// do some work here...
// a job may also add other jobs to the queue...
g2 := pool.NewJobGroup()
if err := pool.AddJob(g2, func(pool threadpool.ThreadPool, erf func() error) error {
// do some work here...
}); err != nil {
// some task returned an error
}
if err := pool.Wait(g2); err != nil {
// some task returned an error
}
// return nil if task is done
return nil
}); err != nil {
// some task returned an error
}
// add other tasks to g1...
// wait until all tasks are done
if err := pool.Wait(g1); err != nil {
// some task returned an error
}
// stop all threads
pool.Stop()
Any of the following functions can be used to add jobs to the queue:
Function | Description |
---|---|
AddJob | add a single job to the queue |
AddRangeJob | add a range job to the queue (replaces for-loops) |
Job | create a job group, add a single job to the queue and wait until it is done |
RangeJob | create a job group, add a range job to the queue and wait until it is done |
Examples
Example 1: Simple job queuing
// create a new thread pool with 5 working threads and
// a queue buffer of 100 (in addition to this thread, 4
// more threads will be launched that start reading
// from the job queue)
pool := threadpool.New(5, 100)
// jobs are always grouped, get a new group index
g := pool.NewJobGroup()
// slice carrying the results
r := make([]int, 20)
// add jobs to the thread pool, where the i'th job sets
// r[i] to the thread index
for i_, _ := range r {
i := i_
pool.AddJob(g, func(pool threadpool.ThreadPool, erf func() error) error {
time.Sleep(10 * time.Millisecond)
r[i] = pool.GetThreadId()+1
return nil
})
}
// wait until all jobs in group g are done, meanwhile, this thread
// is also used as a worker
pool.Wait(g)
fmt.Println("result:", r)
Example 2: Distribute range equally among threads
pool := threadpool.New(5, 100)
g := pool.NewJobGroup()
r := make([]int, 20)
// instead of creating len(r) jobs, this method splits
// r into #threads pieces and adds one job for each piece
// to increase efficiency
pool.AddRangeJob(0, len(r), g, func(i int, pool threadpool.ThreadPool, erf func() error) error {
time.Sleep(10 * time.Millisecond)
r[i] = pool.GetThreadId()+1
return nil
})
pool.Wait(g)
fmt.Println("result:", r)
Example 3: Error handling
pool := threadpool.New(5, 100)
g := pool.NewJobGroup()
r := make([]int, 20)
if err := pool.AddRangeJob(0, len(r), g, func(i int, pool threadpool.ThreadPool, erf func() error) error {
time.Sleep(10 * time.Millisecond)
// stop if there was an error in one of the
// previous jobs
if erf() != nil {
// stop if there was an error
return nil
}
if i == 2 {
r[i] = -1
return fmt.Errorf("error in thread %d", pool.GetThreadId())
} else {
r[i] = pool.GetThreadId()+1
return nil
}
}); err != nil {
fmt.Println(err)
}
if err := pool.Wait(g); err != nil {
fmt.Println(err)
}
fmt.Println("result:", r)
Example 4: Nested job queuing
pool := threadpool.New(5, 100)
g0 := pool.NewJobGroup()
r := make([][]int, 5)
pool.AddRangeJob(0, len(r), g0, func(i int, pool threadpool.ThreadPool, erf func() error) error {
r[i] = make([]int, 5)
// get a new job group for filling the i'th sub-slice, which allows
// us to wait until the sub-slice is filled
gi := pool.NewJobGroup()
for j_, _ := range r[i] {
j := j_
pool.AddJob(gi, func(pool threadpool.ThreadPool, erf func() error) error {
time.Sleep(10 * time.Millisecond)
r[i][j] = pool.GetThreadId()+1
return nil
})
}
// wait until sub-slice i is filled
pool.Wait(gi)
return nil
})
// wait until the whole slice is filled
pool.Wait(g0)
fmt.Println("result:", r)