gocookbook
gocookbook copied to clipboard
使用sync.Cond实现一个有限容量的队列
Cond 是为等待 / 通知场景下的并发问题提供支持的。它提供了条件变量的三个基本方法 Signal、Broadcast 和 Wait,为并发的 goroutine 提供等待 / 通知机制。在实践中,处理等待 / 通知的场景时,我们常常会使用 Channel 替换 Cond,因为 Channel 类型使用起来更简洁,而且不容易出错。但是对于需要重复调用 Broadcast 的场景,比如 Kubernetes 的调度队列ScheduleQueue就是依赖sync.Cond实现的,具体怎么实现的可以看看这篇文章:Kubernetes调度队列对sync.Cond的使用。每次往队列中成功增加了元素后就需要调用 Broadcast 通知所有的等待者,使用 Cond 就再合适不过了。使用 Cond 之所以容易出错,就是 Wait 调用需要加锁,以及被唤醒后一定要检查条件是否真的已经满足。
sync包里的WaitGroup和Cond两个原语有本质上的区别,WaitGroup 是主 goroutine 等待确定数量的子 goroutine 完成任务;而 Cond 是等待某个条件满足,这个条件的修改可以被任意多的 goroutine 更新,而且 Cond 的 Wait 不关心也不知道其他 goroutine 的数量,只关心等待条件。而且 Cond 还有单个通知的机制,也就是 Signal 方法。
下面是用sync.Cond实现的一个类似Kubernetes调度队列的有限队列。
sync.Cond的使用方法和实现原理详见:https://time.geekbang.org/column/article/299312
package main
import (
"fmt"
"math/rand"
"strings"
"sync"
)
type Queue struct {
cond *sync.Cond
data []interface{}
capc int
logs []string
}
func NewQueue(capacity int) *Queue {
return &Queue{cond: &sync.Cond{L: &sync.Mutex{}}, data: make([]interface{}, 0), capc: capacity, logs: make([]string, 0)}
}
func (q *Queue) Enqueue(d interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.data) == q.capc {
q.cond.Wait()
}
// FIFO入队
q.data = append(q.data, d)
// 记录操作日志
q.logs = append(q.logs, fmt.Sprintf("En %v\n", d))
// 通知其他waiter进行Dequeue或Enqueue操作
q.cond.Broadcast()
}
func (q *Queue) Dequeue() (d interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.data) == 0 {
q.cond.Wait()
}
// FIFO出队
d = q.data[0]
q.data = q.data[1:]
// 记录操作日志
q.logs = append(q.logs, fmt.Sprintf("De %v\n", d))
// 通知其他waiter进行Dequeue或Enqueue操作
q.cond.Broadcast()
return
}
func (q *Queue) Len() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return len(q.data)
}
func (q *Queue) String() string {
var b strings.Builder
for _, log := range q.logs {
//fmt.Fprint(&b, log)
b.WriteString(log)
}
return b.String()
}
func main() {
Example()
}
func Example() {
var wg sync.WaitGroup
//容量为5的阻塞队列
que := NewQueue(3)
// 生成随机命令
for i, cmd := range Commands(20, true) {
wg.Add(1)
// 0表示入队,1表示出队
if cmd == 0 {
go func(id int) {
defer wg.Done()
que.Enqueue(id)
}(i)
} else {
go func(id int) {
defer wg.Done()
que.Dequeue()
}(i)
}
}
/*
// 当执行出队、入队命令的worker数『不相等』时
// 最后会有worker阻塞在出队或入队方法上
// 同时主goroutine会阻塞在wg.Wait()上
// 此时所有goroutine都阻塞了
// 下面的goroutine会避免该问题
// 但仍需新worker唤醒阻塞在队列上的worker
go func() {
for {
select{
case <-time.After(time.Second):
runtime.Gosched()
}
}
}()
*/
wg.Wait()
// 输出操作日志
fmt.Println(que)
}
// Commands 用于产生出队、入队命令
func Commands(N int, random bool) []int {
if N%2 != 0 {
panic("will deadlock!")
}
// 0表示入队,1表示出队
commands := make([]int, N)
for i := 0; i < N; i++ {
if i%2 == 0 {
commands[i] = 1
}
}
if random {
// shuffle algorithms
for i := len(commands) - 1; i > 0; i-- {
j := rand.Intn(i + 1)
commands[i], commands[j] = commands[j], commands[i]
}
}
return commands
}