gitalk icon indicating copy to clipboard operation
gitalk copied to clipboard

使用 Go 实现 lock-free 的队列

Open smallnest opened this issue 5 years ago • 15 comments

https://colobu.com/2020/08/14/lock-free-queue-in-go/

smallnest avatar Sep 06 '20 11:09 smallnest

👍

jjeffcaii avatar Sep 24 '20 09:09 jjeffcaii

weaming avatar Oct 07 '20 10:10 weaming

第三个创建make N赋值,Dequeue取出来前N会是nil

lnnujxxy avatar Nov 03 '20 15:11 lnnujxxy

有个问题,Enqueue里面

if tail == load(&q.tail) {
    if next == nil {

如果tail==q.tail,为什么还要判断next==nil?理论上next=q.tail.next必定是nil啊。

silver-birch-wawa avatar May 04 '21 14:05 silver-birch-wawa

有个问题,Enqueue里面

if tail == load(&q.tail) {
    if next == nil {

如果tail==q.tail,为什么还要判断next==nil?理论上next=q.tail.next必定是nil啊。

哦,没事了,是头插法的原因,如果一开始tail指向头节点,next理论上应该是nil,但是如果在load next之前插入了新节点,那么next会load到一个新节点,但是如果if之前queue pop了唯一的那个新节点,那么还是可以tail==q.tail判定无误,但是next可能非nil。

silver-birch-wawa avatar May 04 '21 15:05 silver-birch-wawa

在本地压测了一下,为啥two-lock_queue性能要比free-lock要快?

zhaozuowu avatar Aug 26 '21 07:08 zhaozuowu

在本地压测了一下,为啥two-lock_queue性能要比free-lock要快?

一开始我是单独写了个函数的,后来我手动内联了一下发现还是free-lock快

silver-birch-wawa avatar Aug 27 '21 03:08 silver-birch-wawa

大佬,我就是用你的函数( smallnest/queue)库在本地测的,压测脚本也是源码里的,我在本地测的结果是two-lock_queue快,为啥?

zhaozuowu avatar Aug 27 '21 05:08 zhaozuowu

大佬,我就是用你的函数( smallnest/queue)库在本地测的,压测脚本也是源码里的,我在本地测的结果是two-lock_queue快,为啥?

你弄错了吧,smallnest不是我....

silver-birch-wawa avatar Aug 27 '21 05:08 silver-birch-wawa

大佬能发一下压测的完整代码么,我用smallnest试的,本地压测的话,确实是two-lock_queue

zhaozuowu avatar Aug 30 '21 07:08 zhaozuowu

大佬,先问下,你第二个方法里实现的(two-lock queue),里面用到的sync.mutex, 这里面也是用到cas原子操作哦,这么看的话好像不能应用在没有原子操作的多处理器上。

wrydeveloper avatar Feb 21 '22 15:02 wrydeveloper

ABA 的问题是不是可以忽略?

dusx1981 avatar Jan 16 '23 03:01 dusx1981

既然lock-free 这个版本性能不如带锁的slice,这个实现有什么适合的应用场景吗

Dong148 avatar Oct 17 '23 05:10 Dong148

补充一个使用了sync.pool优化了性能的版本, 并且增加了中文注释 package lockfreequeue

import ( "sync" "sync/atomic" "unsafe" )

type Queue struct { head unsafe.Pointer tail unsafe.Pointer len uint64 pool sync.Pool }

// NewQueue 创建并返回一个新的队列实例。 // 该函数通过初始化队列的头部和尾部指针,并设置队列长度为0,以及配置一个用于回收directItem的同步池。 // 返回值: // // *Queue - 一个指向新创建的队列的指针。 func NewQueue() *Queue { // 初始化队列的头部,它是一个特殊的directItem,其next指向第一个有效元素,v为nil表示头部不存储值。 head := directItem{ next: nil, v: nil, } // 返回新的队列实例 return &Queue{ head: unsafe.Pointer(&head), // 设置头部指针 tail: unsafe.Pointer(&head), // 设置尾部指针,初始时与头部相同 len: 0, // 初始队列长度为0 pool: sync.Pool{ // 初始化同步池,用于directItem的回收 New: func() any { return &directItem{} // 池的New方法,用于生成新的directItem实例 }, }, } }

// Enqueue 将一个元素添加到队列的末尾。 // 该操作是线程安全的,确保多个goroutine同时添加元素时队列的完整性。 // 参数: // // v: 要添加到队列的元素,可以是任何类型的值。 func (q *Queue) Enqueue(v any) { // 从共享池中获取一个directItem,并初始化它。 // 这样做既减少了内存分配的开销,也统一了队列元素的管理。 i := q.pool.Get().(*directItem) i.next = nil i.v = v

// 初始化last和lastNext指针,用于在循环中追踪队列的尾部。
var last, lastNext *directItem

// 使用CAS操作循环尝试更新队列的尾部。
// 这个循环确保了在多线程环境下队列的尾部能够正确更新。
for {
	// 加载当前队列的尾部指针。
	last = loaditem(&q.tail)
	// 加载当前尾部指针的下一个元素。
	lastNext = loaditem(&last.next)

	// 再次检查队列的尾部指针是否未变,
	// 这是必要的,因为在上一次加载后,可能已经被其他goroutine修改。
	if loaditem(&q.tail) == last {
		// 如果当前尾部的下一个元素为空,说明可以将新元素添加到队列的末尾。
		if lastNext == nil {
			// 使用CAS操作更新尾部的下一个元素为新元素,并更新队列的尾部指针。
			// 这样做保证了更新操作的原子性,避免了竞态条件。
			if casitem(&last.next, lastNext, i) {
				// 更新队列的尾部指针,确保队列的尾部正确指向新的元素。
				casitem(&q.tail, last, i)
				// 原子性增加队列的长度。
				atomic.AddUint64(&q.len, 1)
				// 添加成功,退出函数。
				return
			}
		} else {
			// 如果当前尾部的下一个元素不为空,说明有其他goroutine已经添加了元素,
			// 或者正在尝试添加。此时需要更新队列的尾部指针,以避免死锁。
			// 这个操作确保了队列的持续可操作性,即使在高并发环境下。
			casitem(&q.tail, last, lastNext)
		}
	}
}

}

// Dequeue 从队列中移除并返回一个元素。 // 如果队列为空,函数返回 nil。 // Dequeue 从队列中移除并返回一个元素。这个操作是线程安全的。 // 如果队列为空,函数返回 nil。 func (q *Queue) Dequeue() interface{} { // 定义指向队列首尾和首元素下一个元素的指针 var first, last, firstnext *directItem for { // 读取队列头部和尾部的元素 first = loaditem(&q.head) last = loaditem(&q.tail) // 读取队列头部元素的下一个元素 firstnext = loaditem(&first.next) // 检查队列的头部、尾部和下一个元素是否一致 if first == loaditem(&q.head) { // 检查队列是否为空 if first == last { // 如果队列确实为空 if firstnext == nil { // 队列为空,无法移除元素,返回 nil return nil } // 尾部指针落后,尝试将其向前移动 casitem(&q.tail, last, firstnext) } else { // 在尝试交换头部指针之前读取值,否则另一个移除操作可能会释放下一个节点 v := firstnext.v // 尝试将头部指针移动到下一个节点 if casitem(&q.head, first, firstnext) { // 队列长度减一 atomic.AddUint64(&q.len, ^uint64(0)) // 回收被移除的元素 q.pool.Put(first) // 返回移除的元素 return v } } } } }

hawkli-1994 avatar Aug 17 '24 10:08 hawkli-1994

补充一个使用了sync.pool优化了性能的版本, 并且增加了中文注释

package lockfreequeue

import (
	"sync"
	"sync/atomic"
	"unsafe"
)

type Queue struct {
	head unsafe.Pointer
	tail unsafe.Pointer
	len  uint64
	pool sync.Pool
}

// NewQueue 创建并返回一个新的队列实例。
// 该函数通过初始化队列的头部和尾部指针,并设置队列长度为0,以及配置一个用于回收directItem的同步池。
// 返回值:
//
//	*Queue - 一个指向新创建的队列的指针。
func NewQueue() *Queue {
	// 初始化队列的头部,它是一个特殊的directItem,其next指向第一个有效元素,v为nil表示头部不存储值。
	head := directItem{
		next: nil,
		v:    nil,
	}
	// 返回新的队列实例
	return &Queue{
		head: unsafe.Pointer(&head), // 设置头部指针
		tail: unsafe.Pointer(&head), // 设置尾部指针,初始时与头部相同
		len:  0,                     // 初始队列长度为0
		pool: sync.Pool{ // 初始化同步池,用于directItem的回收
			New: func() any {
				return &directItem{} // 池的New方法,用于生成新的directItem实例
			},
		},
	}
}

// Enqueue 将一个元素添加到队列的末尾。
// 该操作是线程安全的,确保多个goroutine同时添加元素时队列的完整性。
// 参数:
//
//	v: 要添加到队列的元素,可以是任何类型的值。
func (q *Queue) Enqueue(v any) {
	// 从共享池中获取一个directItem,并初始化它。
	// 这样做既减少了内存分配的开销,也统一了队列元素的管理。
	i := q.pool.Get().(*directItem)
	i.next = nil
	i.v = v

	// 初始化last和lastNext指针,用于在循环中追踪队列的尾部。
	var last, lastNext *directItem

	// 使用CAS操作循环尝试更新队列的尾部。
	// 这个循环确保了在多线程环境下队列的尾部能够正确更新。
	for {
		// 加载当前队列的尾部指针。
		last = loaditem(&q.tail)
		// 加载当前尾部指针的下一个元素。
		lastNext = loaditem(&last.next)

		// 再次检查队列的尾部指针是否未变,
		// 这是必要的,因为在上一次加载后,可能已经被其他goroutine修改。
		if loaditem(&q.tail) == last {
			// 如果当前尾部的下一个元素为空,说明可以将新元素添加到队列的末尾。
			if lastNext == nil {
				// 使用CAS操作更新尾部的下一个元素为新元素,并更新队列的尾部指针。
				// 这样做保证了更新操作的原子性,避免了竞态条件。
				if casitem(&last.next, lastNext, i) {
					// 更新队列的尾部指针,确保队列的尾部正确指向新的元素。
					casitem(&q.tail, last, i)
					// 原子性增加队列的长度。
					atomic.AddUint64(&q.len, 1)
					// 添加成功,退出函数。
					return
				}
			} else {
				// 如果当前尾部的下一个元素不为空,说明有其他goroutine已经添加了元素,
				// 或者正在尝试添加。此时需要更新队列的尾部指针,以避免死锁。
				// 这个操作确保了队列的持续可操作性,即使在高并发环境下。
				casitem(&q.tail, last, lastNext)
			}
		}
	}
}

// Dequeue 从队列中移除并返回一个元素。
// 如果队列为空,函数返回 nil。
// Dequeue 从队列中移除并返回一个元素。这个操作是线程安全的。
// 如果队列为空,函数返回 nil。
func (q *Queue) Dequeue() interface{} {
	// 定义指向队列首尾和首元素下一个元素的指针
	var first, last, firstnext *directItem
	for {
		// 读取队列头部和尾部的元素
		first = loaditem(&q.head)
		last = loaditem(&q.tail)
		// 读取队列头部元素的下一个元素
		firstnext = loaditem(&first.next)
		// 检查队列的头部、尾部和下一个元素是否一致
		if first == loaditem(&q.head) {
			// 检查队列是否为空
			if first == last {
				// 如果队列确实为空
				if firstnext == nil {
					// 队列为空,无法移除元素,返回 nil
					return nil
				}
				// 尾部指针落后,尝试将其向前移动
				casitem(&q.tail, last, firstnext)
			} else {
				// 在尝试交换头部指针之前读取值,否则另一个移除操作可能会释放下一个节点
				v := firstnext.v
				// 尝试将头部指针移动到下一个节点
				if casitem(&q.head, first, firstnext) {
					// 队列长度减一
					atomic.AddUint64(&q.len, ^uint64(0))
					// 回收被移除的元素
					q.pool.Put(first)
					// 返回移除的元素
					return v
				}
			}
		}
	}
}

// Length returns the length of the queue.
func (q *Queue) Length() uint64 {
	return atomic.LoadUint64(&q.len)
}

hawkli-1994 avatar Aug 17 '24 10:08 hawkli-1994