workerpool-go
workerpool-go copied to clipboard
doesn't work with example code when using `NewPoolWithResults`
If I take the example from the documentation but I add WithResult
:
func main() {
p, _ := workerpool.NewPoolWithResults(4, func(job workerpool.Job[float64], workerID int) (int, error) {
result := math.Sqrt(job.Payload)
fmt.Println("result:", result)
return 0, nil
})
for i := 0; i < 100; i++ {
p.Submit(float64(i))
}
p.StopAndWait()
for result := range p.Results {
fmt.Println(result)
}
}
the code never stops and is stuck at iteration ~3
Maybe I should improve the documentation to explain more how to use this library correctly.
Job submission and reading of results have to run concurrently so they have to run on different goroutines.
In your code you don't read from the p.Results
channel (you think you do but the program never reaches the second loop), so after a couple of job submissions the p.Results
channel is full and the workers of the pool cannot write their result to the p.Results
channel so they all block (deadlock).
I fixed your code by putting the submission loop in a goroutine. And I also changed the handler function to return result, nil
and used (float64, nil)
as a return type.
Here is the fixed code:
package main
import (
"fmt"
"math"
"go.mitsakis.org/workerpool"
)
func main() {
p, _ := workerpool.NewPoolWithResults(4, func(job workerpool.Job[float64], workerID int) (float64, error) {
result := math.Sqrt(job.Payload)
fmt.Println("result:", result)
return result, nil
})
go func() {
for i := 0; i < 100; i++ {
p.Submit(float64(i))
}
p.StopAndWait()
}()
for result := range p.Results {
fmt.Println(result.Value)
}
}
I ended up adding a helper for workerpool that made running over the inputs and collecting the outputs to be processed as a whole easier
package pool
import (
"fmt"
"go.mitsakis.org/workerpool"
)
type Result[O any] struct {
Value O
Error error
}
type IndexedInput[V any] struct {
Index int
Value V
}
func RunAndCollectResults[I, O any](numOfWorkers int, inputs []I, handler func(index int, input I) (O, error)) ([]Result[O], error) {
outputs := make([]Result[O], len(inputs))
p, err := workerpool.NewPoolWithResults(numOfWorkers, func(job workerpool.Job[IndexedInput[I]], workerID int) (O, error) {
return handler(job.Payload.Index, job.Payload.Value)
})
if err != nil {
return nil, err
}
go func() {
for i, input := range inputs {
indexed := IndexedInput[I]{Index: i, Value: input}
p.Submit(indexed)
}
p.StopAndWait()
}()
// Apply the results to the outputs based on the index
for r := range p.Results {
outputs[r.Job.Payload.Index].Value = r.Value
outputs[r.Job.Payload.Index].Error = r.Error
}
// If there is an error then return either the singlar error or something that indicates multiple errors
count := 0
for _, output := range outputs {
if output.Error != nil {
err = output.Error
count++
}
}
if count > 1 {
err = fmt.Errorf("multiple errors: %d errors returned by handler", count)
}
return outputs, err
}
with an example in the test as
package pool_test
import (
"errors"
"math"
"testing"
"github.com/stretchr/testify/suite"
"openmetagame.dev/pkg/pool"
)
type poolTestSuite struct {
suite.Suite
}
func (s *poolTestSuite) TestStandard() {
inputs := []float64{11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30}
outputs, err := pool.RunAndCollectResults(4, inputs, func(index int, input float64) (float64, error) {
return math.Sqrt(input), nil
})
expected := make([]pool.Result[float64], 0, len(inputs))
for _, input := range inputs {
expected = append(expected, pool.Result[float64]{Value: math.Sqrt(input), Error: nil})
}
s.Assertions.NoError(err)
s.Assertions.ElementsMatch(expected, outputs)
}
func (s *poolTestSuite) TestHandlerErrors() {
inputs := []float64{11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30}
expectedErr := errors.New("foo")
_, err := pool.RunAndCollectResults(4, inputs, func(index int, input float64) (float64, error) {
return math.Sqrt(input), expectedErr
})
s.Assertions.Error(err)
s.Assertions.Equal("multiple errors: 20 errors returned by handler", err.Error())
}
func TestSuite(t *testing.T) {
suite.Run(t, new(poolTestSuite))
}