boomer icon indicating copy to clipboard operation
boomer copied to clipboard

fix: yield processor explicitly

Open zezhehh opened this issue 1 year ago • 4 comments

It will spend more time within the potentially continuous goroutine, as we anticipate the scheduler to switch contexts more frequently under high concurrency, especially in the local runner. To address it, we explicitly yield the processor in the loop block with a default statement.

@myzhan :D

zezhehh avatar Dec 01 '23 13:12 zezhehh

@zezhehh I'm not sure why we should do this explicitly.

myzhan avatar Dec 04 '23 12:12 myzhan

@zezhehh I'm not sure why we should do this explicitly.

@myzhan I think it explains well 👉 https://medium.com/@genchilu/one-goroutine-trap-when-would-goroutine-context-switch-f91b9a20b8e8

Of course it will not only spawn 4 goroutines before any noticeable stuck, it is stuck for me around every 700 goroutines without runtime.Gosched, while there's no apparent stuck with runtime.Gosched (and finish spawning for 10000 goroutines much quicker).

zezhehh avatar Dec 04 '23 14:12 zezhehh

OK, now I get it. Can you provide your code of boomer task to reproduce this issue?

myzhan avatar Dec 11 '23 11:12 myzhan

OK, now I get it. Can you provide your code of boomer task to reproduce this issue?

@myzhan Adding explicit runtime.Gosched() in boomer will reduce the chance of a panic. (try to run it multiple times)

package main

import (
	"sync"
	"time"
	"runtime"

	"github.com/myzhan/boomer"
)

func someSequentialActions() {
	for step := 0; step < 20; step++ {
		sum := 0
		for i := 0; i < 100000; i++ {
			sum += i
			runtime.Gosched()
		}
	}
}

const (
	interval   = 50 * time.Millisecond
	spawnCount = 1000
	spawnRate  = 50
)

type Foo struct {
	stop   chan any
	stream chan int
}

func (f *Foo) Generate() {
	ticker := time.NewTicker(interval)
	defer ticker.Stop()
	for {
		select {
		case <-f.stop:
			ticker.Stop()
			return
		case <-ticker.C:
			f.stream <- 1
		}
	}
}

func (f *Foo) Receive(stream <-chan int) {
	maxInterval := 2 * interval
	for {
		select {
		case <-f.stop:
			return
		case <-time.After(maxInterval):
			panic("timeout")
		case <-stream:
			// do something
		}
	}

}

func (f *Foo) Stop() {
	close(f.stop)
}

func main() {
	wg := sync.WaitGroup{}
	task := &boomer.Task{
		Name:   "foo",
		Weight: 10,
		Fn: func() {
			wg.Add(1)
			defer wg.Done()

			f := &Foo{
				stop:   make(chan any),
				stream: make(chan int),
			}
			defer f.Stop()

			go f.Generate()
			go f.Receive(f.stream)

			someSequentialActions()
		},
	}

	instance := boomer.NewStandaloneBoomer(spawnCount, spawnRate)
	go instance.Run(task)

	time.Sleep(100 * time.Second)
	instance.Quit()
	wg.Wait()
	println("done")
}

zezhehh avatar Dec 13 '23 14:12 zezhehh

I can't reproduce the stuck issue, but I think it's harmless to add runtime.Gosched().

myzhan avatar May 07 '24 02:05 myzhan