workerpool-go icon indicating copy to clipboard operation
workerpool-go copied to clipboard

doesn't work with example code when using `NewPoolWithResults`

Open ethanquix opened this issue 1 year ago • 2 comments

If I take the example from the documentation but I add WithResult:

func main() {
	p, _ := workerpool.NewPoolWithResults(4, func(job workerpool.Job[float64], workerID int) (int, error) {
		result := math.Sqrt(job.Payload)
		fmt.Println("result:", result)
		return 0, nil
	})
	for i := 0; i < 100; i++ {
		p.Submit(float64(i))
	}
	p.StopAndWait()

	for result := range p.Results {
		fmt.Println(result)
	}
}

the code never stops and is stuck at iteration ~3

ethanquix avatar Aug 13 '23 04:08 ethanquix

Maybe I should improve the documentation to explain more how to use this library correctly.

Job submission and reading of results have to run concurrently so they have to run on different goroutines.

In your code you don't read from the p.Results channel (you think you do but the program never reaches the second loop), so after a couple of job submissions the p.Results channel is full and the workers of the pool cannot write their result to the p.Results channel so they all block (deadlock).

I fixed your code by putting the submission loop in a goroutine. And I also changed the handler function to return result, nil and used (float64, nil) as a return type.

Here is the fixed code:

package main

import (
	"fmt"
	"math"

	"go.mitsakis.org/workerpool"
)

func main() {
	p, _ := workerpool.NewPoolWithResults(4, func(job workerpool.Job[float64], workerID int) (float64, error) {
		result := math.Sqrt(job.Payload)
		fmt.Println("result:", result)
		return result, nil
	})
	go func() {
		for i := 0; i < 100; i++ {
			p.Submit(float64(i))
		}
		p.StopAndWait()
	}()

	for result := range p.Results {
		fmt.Println(result.Value)
	}
}

cmitsakis avatar Aug 13 '23 12:08 cmitsakis

I ended up adding a helper for workerpool that made running over the inputs and collecting the outputs to be processed as a whole easier

package pool

import (
	"fmt"

	"go.mitsakis.org/workerpool"
)

type Result[O any] struct {
	Value O
	Error error
}

type IndexedInput[V any] struct {
	Index int
	Value V
}

func RunAndCollectResults[I, O any](numOfWorkers int, inputs []I, handler func(index int, input I) (O, error)) ([]Result[O], error) {
	outputs := make([]Result[O], len(inputs))

	p, err := workerpool.NewPoolWithResults(numOfWorkers, func(job workerpool.Job[IndexedInput[I]], workerID int) (O, error) {
		return handler(job.Payload.Index, job.Payload.Value)
	})
	if err != nil {
		return nil, err
	}

	go func() {
		for i, input := range inputs {
			indexed := IndexedInput[I]{Index: i, Value: input}
			p.Submit(indexed)
		}
		p.StopAndWait()
	}()

	// Apply the results to the outputs based on the index
	for r := range p.Results {
		outputs[r.Job.Payload.Index].Value = r.Value
		outputs[r.Job.Payload.Index].Error = r.Error
	}

	// If there is an error then return either the singlar error or something that indicates multiple errors
	count := 0
	for _, output := range outputs {
		if output.Error != nil {
			err = output.Error
			count++
		}
	}

	if count > 1 {
		err = fmt.Errorf("multiple errors: %d errors returned by handler", count)
	}

	return outputs, err
}

with an example in the test as

package pool_test

import (
	"errors"
	"math"
	"testing"

	"github.com/stretchr/testify/suite"

	"openmetagame.dev/pkg/pool"
)

type poolTestSuite struct {
	suite.Suite
}

func (s *poolTestSuite) TestStandard() {
	inputs := []float64{11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30}
	outputs, err := pool.RunAndCollectResults(4, inputs, func(index int, input float64) (float64, error) {
		return math.Sqrt(input), nil
	})

	expected := make([]pool.Result[float64], 0, len(inputs))
	for _, input := range inputs {
		expected = append(expected, pool.Result[float64]{Value: math.Sqrt(input), Error: nil})
	}

	s.Assertions.NoError(err)
	s.Assertions.ElementsMatch(expected, outputs)
}

func (s *poolTestSuite) TestHandlerErrors() {
	inputs := []float64{11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30}
	expectedErr := errors.New("foo")

	_, err := pool.RunAndCollectResults(4, inputs, func(index int, input float64) (float64, error) {
		return math.Sqrt(input), expectedErr
	})
	s.Assertions.Error(err)
	s.Assertions.Equal("multiple errors: 20 errors returned by handler", err.Error())
}

func TestSuite(t *testing.T) {
	suite.Run(t, new(poolTestSuite))
}

ben-tbotlabs avatar Nov 22 '23 22:11 ben-tbotlabs