conc icon indicating copy to clipboard operation
conc copied to clipboard

conc/stream.Stream run sequentially if no max goroutine set

Open pascalmail opened this issue 5 months ago • 1 comments

If we do not set any concurrency, Stream will do its work sequentially.

I think it is because of https://github.com/sourcegraph/conc/blob/v0.3.0/stream/stream.go#L112, where conc will set the internal queue to 1 instead of unlimited. Is this expected?

Example:


func TestParallelWork(t *testing.T) {
	times := []int{20, 52, 16, 45, 4, 80}

	// Without concurrency limit, the goroutines scheduling blocks until previous callbacks are executed.
	//0 queueing 20ms
	//0 queueing 52ms
	//20 20ms
	//20 queueing 16ms
	//52 52ms
	//52 16ms
	//32 queueing 45ms
	//0 queueing 4ms
	//78 45ms
	//46 4ms
	//46 queueing 80ms
	//127 80ms
	t.Run("stream without concurrency limit", func(t *testing.T) {
		stream := cstream.New()
		for _, millis := range times {
			dur := time.Duration(millis) * time.Millisecond
			t := time.Now()
			stream.Go(func() cstream.Callback {
				fmt.Println(time.Now().Sub(t).Milliseconds(), "queueing", dur)
				time.Sleep(dur)
				// This will print in the order the tasks were submitted
				return func() { fmt.Println(time.Now().Sub(t).Milliseconds(), dur) }
			})
		}
		stream.Wait()
	})

	// With concurrency limit, the goroutines are executed concurrently up to the limit.
	//0 queueing 20ms
	//0 queueing 52ms
	//0 queueing 16ms
	//0 queueing 45ms
	//0 queueing 4ms
	//0 queueing 80ms
	//20 20ms
	//52 52ms
	//52 16ms
	//52 45ms
	//52 4ms
	//81 80ms
	t.Run("stream with concurrency limit", func(t *testing.T) {
		stream := cstream.New().WithMaxGoroutines(len(times))
		for _, millis := range times {
			dur := time.Duration(millis) * time.Millisecond
			t := time.Now()
			stream.Go(func() cstream.Callback {
				fmt.Println(time.Now().Sub(t).Milliseconds(), "queueing", dur)
				time.Sleep(dur)
				// This will print in the order the tasks were submitted
				return func() { fmt.Println(time.Now().Sub(t).Milliseconds(), dur) }
			})
		}
		stream.Wait()
	})
}

pascalmail avatar Jul 25 '25 05:07 pascalmail

@pascalmail I agree this should be considered a bug. It's not really specific to the MaxGoroutines = 0 case either, though it's particularly bad then.

Consider if you set MaxGoroutines = 2. I would expect the program to keep trying to run 2 goroutines at a time until they're all done, and to print out the callbacks sequentially, as soon as each goroutine finishes. Let's say however that the first goroutine blocks for a very long duration, but all the other goroutines finish very quickly.

Here's roughly how it would proceed:

  1. Start. Stream is empty --> [] running, [] callbacks in queue
  2. goroutine 0 submitted --> [0] running, [0] callback in queue
  3. goroutine 1 submitted --> [0,1] running, [0,1] callbacks in queue
  4. goroutine 1 finishes --> [0] running, [0,1] callbacks in queue
  5. goroutine 2 submitted --> [0,2] running, [0,1,2] callbacks in queue
  6. goroutine 2 finishes --> [0] running, [0,1,2] callbacks in queue
  7. goroutine 3 submitted --> the callback queue is full (since MaxGoroutines+1 = 3), so this blocks before 3 can begin execution.

At that point, the system is stuck until goroutine 0 finally finishes. This problem is relevant for any value of MaxGoroutines, since in general, the system gets stuck when goroutine n is still processing as goroutine (n + MaxGoroutines + 1) is submitted.

It's possible to rearchitect to collect an unbounded list of callbacks, and that would solve this problem, but that would potentially require a ton of memory, so it has its own problems. The most sensible thing would maybe be to have another parameter on Streams for maximum callback queue length.

srabraham avatar Nov 05 '25 15:11 srabraham