Blog icon indicating copy to clipboard operation
Blog copied to clipboard

singleflight 源码阅读

Open LLLeon opened this issue 4 years ago • 0 comments

缓存击穿

在缓存系统中,当某个热点数据的缓存过期时,如果瞬间有大量请求到达 DB,可能会导致 DB 出现问题。

怎么解决?可以让先到达的请求将最新数据更新到缓存,其它请求再使用缓存数据即可。

Go 中的 singleflight 包就可以用于这个场景。它可以只让其中一个请求得到执行,其余请求会阻塞到该请求返回执行结果并使用该结果,从而达到防止击穿的效果。

singleflight 源码

singleflight.go 的源码比较简单:

// 表示一组请求
type call struct {
  // 用来阻塞其余请求
	wg sync.WaitGroup

	// 被调用的函数返回的结果和 err 赋值给这两个字段
	val interface{}
	err error

	// forgotten indicates whether Forget was called with this call's key while the call was still in flight.
	forgotten bool

	// These fields are read and written with the singleflight
	// mutex held before the WaitGroup is done, and are read but
	// not written after the WaitGroup is done.
	dups  int
	chans []chan<- Result
}

// 用来存储不同 key 的请求组
type Group struct {
	mu sync.Mutex       // protects m
	m  map[string]*call // lazily initialized
}

// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
	Val    interface{}
	Err    error
	Shared bool
}

// 执行指定函数并返回结果
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
	g.mu.Lock()
  // 延迟初始化
	if g.m == nil {
		g.m = make(map[string]*call)
	}
  // 如果当前请求之前已经有对该 key 的请求, 则阻塞当前请求
	if c, ok := g.m[key]; ok {
		c.dups++
		g.mu.Unlock() // 释放锁
		c.wg.Wait() // wg 的这种用法妙啊

		if e, ok := c.err.(*panicError); ok {
			panic(e)
		} else if c.err == errGoexit {
			runtime.Goexit()
		}
    // 使用前面请求返回的结果
		return c.val, c.err, true
	}
  
	c := new(call)
	c.wg.Add(1)
	g.m[key] = c // 用 key 来标识一组同样的请求
	g.mu.Unlock()

  // 调用 fn 并返回结果
	g.doCall(c, key, fn)
	return c.val, c.err, c.dups > 0
}

// 与 Do 方法效果一样,不过这里不阻塞当前请求,而是直接返回一个 channel 用于接收结果
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
	ch := make(chan Result, 1) // 当前请求从这个 ch 接收结果
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {
		c.dups++
		c.chans = append(c.chans, ch)
		g.mu.Unlock()
		return ch
	}

	c := &call{chans: []chan<- Result{ch}}
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

  // 异步调用 fn
	go g.doCall(c, key, fn)

	return ch
}

// doCall handles the single call for a key.
// 对 fn 执行调用, 并将结果
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
	normalReturn := false
	recovered := false

	// use double-defer to distinguish panic from runtime.Goexit,
	// more details see https://golang.org/cl/134395
  // 这个 defer 会后执行, 负责判断
	defer func() {
		// the given function invoked runtime.Goexit
		if !normalReturn && !recovered {
			c.err = errGoexit
		}

		c.wg.Done()
		g.mu.Lock()
		defer g.mu.Unlock()
		if !c.forgotten { // 返回之前删除这个 key, 标识着本次对 key 的调用完成
			delete(g.m, key)
		}

		if e, ok := c.err.(*panicError); ok {
			// In order to prevent the waiting channels from being blocked forever,
			// needs to ensure that this panic cannot be recovered.
			if len(c.chans) > 0 {
				go panic(e)
				select {} // Keep this goroutine around so that it will appear in the crash dump.
			} else {
				panic(e)
			}
		} else if c.err == errGoexit {
			// Already in the process of goexit, no need to call again
		} else {
      // doCall 返回之前将调用结果写入到所有其余等待结果的请求的 channel 中
			for _, ch := range c.chans {
				ch <- Result{c.val, c.err, c.dups > 0}
			}
		}
	}()

  // 执行 fn 的部分, 而且是在一个匿名函数里面执行.
  // 这里又用到了一个 defer(这个 defer 会先执行), 用来区分 panic 和 runtime.Goexit.
	func() {
		defer func() {
			if !normalReturn {
				if r := recover(); r != nil {
					c.err = newPanicError(r) // 如果 panic 了, 修改 c.err 的值
				}
			}
		}()

		c.val, c.err = fn()
		normalReturn = true
	}()

	if !normalReturn {
		recovered = true
	}
}

// Forget tells the singleflight to forget about a key.  Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group) Forget(key string) {
	g.mu.Lock()
	if c, ok := g.m[key]; ok {
		c.forgotten = true
	}
	delete(g.m, key)
	g.mu.Unlock()
}

代码虽然不多,里面一些巧妙的用法可以学习。

LLLeon avatar May 09 '21 15:05 LLLeon