gocookbook icon indicating copy to clipboard operation
gocookbook copied to clipboard

使用sync.Cond实现一个有限容量的队列

Open kevinyan815 opened this issue 4 years ago • 0 comments

Cond 是为等待 / 通知场景下的并发问题提供支持的。它提供了条件变量的三个基本方法 Signal、Broadcast 和 Wait,为并发的 goroutine 提供等待 / 通知机制。在实践中,处理等待 / 通知的场景时,我们常常会使用 Channel 替换 Cond,因为 Channel 类型使用起来更简洁,而且不容易出错。但是对于需要重复调用 Broadcast 的场景,比如 Kubernetes 的调度队列ScheduleQueue就是依赖sync.Cond实现的,具体怎么实现的可以看看这篇文章:Kubernetes调度队列对sync.Cond的使用。每次往队列中成功增加了元素后就需要调用 Broadcast 通知所有的等待者,使用 Cond 就再合适不过了。使用 Cond 之所以容易出错,就是 Wait 调用需要加锁,以及被唤醒后一定要检查条件是否真的已经满足。

sync包里的WaitGroup和Cond两个原语有本质上的区别,WaitGroup 是主 goroutine 等待确定数量的子 goroutine 完成任务;而 Cond 是等待某个条件满足,这个条件的修改可以被任意多的 goroutine 更新,而且 Cond 的 Wait 不关心也不知道其他 goroutine 的数量,只关心等待条件。而且 Cond 还有单个通知的机制,也就是 Signal 方法。

下面是用sync.Cond实现的一个类似Kubernetes调度队列的有限队列。

sync.Cond的使用方法和实现原理详见:https://time.geekbang.org/column/article/299312

package main

import (
	"fmt"
	"math/rand"
	"strings"
	"sync"
)

type Queue struct {
	cond *sync.Cond
	data []interface{}
	capc int
	logs []string
}

func NewQueue(capacity int) *Queue {
	return &Queue{cond: &sync.Cond{L: &sync.Mutex{}}, data: make([]interface{}, 0), capc: capacity, logs: make([]string, 0)}
}

func (q *Queue) Enqueue(d interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()

	for len(q.data) == q.capc {
		q.cond.Wait()
	}
	// FIFO入队
	q.data = append(q.data, d)
	// 记录操作日志
	q.logs = append(q.logs, fmt.Sprintf("En %v\n", d))
	// 通知其他waiter进行Dequeue或Enqueue操作
	q.cond.Broadcast()

}

func (q *Queue) Dequeue() (d interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()

	for len(q.data) == 0 {
		q.cond.Wait()
	}
	// FIFO出队
	d = q.data[0]
	q.data = q.data[1:]
	// 记录操作日志
	q.logs = append(q.logs, fmt.Sprintf("De %v\n", d))
	// 通知其他waiter进行Dequeue或Enqueue操作
	q.cond.Broadcast()
	return
}

func (q *Queue) Len() int {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
	return len(q.data)
}

func (q *Queue) String() string {
	var b strings.Builder
	for _, log := range q.logs {
		//fmt.Fprint(&b, log)
		b.WriteString(log)
	}
	return b.String()
}

func main() {
	Example()
}

func Example() {
	var wg sync.WaitGroup
	//容量为5的阻塞队列
	que := NewQueue(3)

	// 生成随机命令
	for i, cmd := range Commands(20, true) {
		wg.Add(1)

		// 0表示入队,1表示出队
		if cmd == 0 {
			go func(id int) {
				defer wg.Done()
				que.Enqueue(id)
			}(i)
		} else {
			go func(id int) {
				defer wg.Done()
				que.Dequeue()
			}(i)
		}
	}

	/*
		// 当执行出队、入队命令的worker数『不相等』时
		// 最后会有worker阻塞在出队或入队方法上
		// 同时主goroutine会阻塞在wg.Wait()上
		// 此时所有goroutine都阻塞了
		// 下面的goroutine会避免该问题
		// 但仍需新worker唤醒阻塞在队列上的worker
		go func() {
			for {
				select{
				case <-time.After(time.Second):
					runtime.Gosched()
				}
			}
		}()
	*/

	wg.Wait()

	// 输出操作日志
	fmt.Println(que)
}

// Commands 用于产生出队、入队命令
func Commands(N int, random bool) []int {
	if N%2 != 0 {
		panic("will deadlock!")
	}
	// 0表示入队,1表示出队
	commands := make([]int, N)
	for i := 0; i < N; i++ {
		if i%2 == 0 {
			commands[i] = 1
		}
	}

	if random {
		// shuffle algorithms
		for i := len(commands) - 1; i > 0; i-- {
			j := rand.Intn(i + 1)
			commands[i], commands[j] = commands[j], commands[i]
		}
	}

	return commands
}

kevinyan815 avatar Nov 01 '20 03:11 kevinyan815