conc/stream.Stream run sequentially if no max goroutine set
If we do not set any concurrency, Stream will do its work sequentially.
I think it is because of https://github.com/sourcegraph/conc/blob/v0.3.0/stream/stream.go#L112, where conc will set the internal queue to 1 instead of unlimited. Is this expected?
Example:
func TestParallelWork(t *testing.T) {
times := []int{20, 52, 16, 45, 4, 80}
// Without concurrency limit, the goroutines scheduling blocks until previous callbacks are executed.
//0 queueing 20ms
//0 queueing 52ms
//20 20ms
//20 queueing 16ms
//52 52ms
//52 16ms
//32 queueing 45ms
//0 queueing 4ms
//78 45ms
//46 4ms
//46 queueing 80ms
//127 80ms
t.Run("stream without concurrency limit", func(t *testing.T) {
stream := cstream.New()
for _, millis := range times {
dur := time.Duration(millis) * time.Millisecond
t := time.Now()
stream.Go(func() cstream.Callback {
fmt.Println(time.Now().Sub(t).Milliseconds(), "queueing", dur)
time.Sleep(dur)
// This will print in the order the tasks were submitted
return func() { fmt.Println(time.Now().Sub(t).Milliseconds(), dur) }
})
}
stream.Wait()
})
// With concurrency limit, the goroutines are executed concurrently up to the limit.
//0 queueing 20ms
//0 queueing 52ms
//0 queueing 16ms
//0 queueing 45ms
//0 queueing 4ms
//0 queueing 80ms
//20 20ms
//52 52ms
//52 16ms
//52 45ms
//52 4ms
//81 80ms
t.Run("stream with concurrency limit", func(t *testing.T) {
stream := cstream.New().WithMaxGoroutines(len(times))
for _, millis := range times {
dur := time.Duration(millis) * time.Millisecond
t := time.Now()
stream.Go(func() cstream.Callback {
fmt.Println(time.Now().Sub(t).Milliseconds(), "queueing", dur)
time.Sleep(dur)
// This will print in the order the tasks were submitted
return func() { fmt.Println(time.Now().Sub(t).Milliseconds(), dur) }
})
}
stream.Wait()
})
}
@pascalmail I agree this should be considered a bug. It's not really specific to the MaxGoroutines = 0 case either, though it's particularly bad then.
Consider if you set MaxGoroutines = 2. I would expect the program to keep trying to run 2 goroutines at a time until they're all done, and to print out the callbacks sequentially, as soon as each goroutine finishes. Let's say however that the first goroutine blocks for a very long duration, but all the other goroutines finish very quickly.
Here's roughly how it would proceed:
- Start. Stream is empty --> [] running, [] callbacks in queue
- goroutine 0 submitted --> [0] running, [0] callback in queue
- goroutine 1 submitted --> [0,1] running, [0,1] callbacks in queue
- goroutine 1 finishes --> [0] running, [0,1] callbacks in queue
- goroutine 2 submitted --> [0,2] running, [0,1,2] callbacks in queue
- goroutine 2 finishes --> [0] running, [0,1,2] callbacks in queue
- goroutine 3 submitted --> the callback queue is full (since MaxGoroutines+1 = 3), so this blocks before 3 can begin execution.
At that point, the system is stuck until goroutine 0 finally finishes. This problem is relevant for any value of MaxGoroutines, since in general, the system gets stuck when goroutine n is still processing as goroutine (n + MaxGoroutines + 1) is submitted.
It's possible to rearchitect to collect an unbounded list of callbacks, and that would solve this problem, but that would potentially require a ton of memory, so it has its own problems. The most sensible thing would maybe be to have another parameter on Streams for maximum callback queue length.