gocookbook icon indicating copy to clipboard operation
gocookbook copied to clipboard

并发编程--信号量

Open kevinyan815 opened this issue 4 years ago • 0 comments

信号量是并发编程中常见的一种同步机制,在需要控制访问资源的进程数量时就会用到信号量,维基百科对信号量的解释如下。

信号量的概念是计算机科学家 Dijkstra (Dijkstra算法的发明者)提出来的,广泛应用在不同的操作系统中。系统中,会给每一个进程一个信号量,代表每个进程当前的状态,未得到控制权的进程,会在特定的地方被迫停下来,等待可以继续进行的信号到来。

如果信号量是一个任意的整数,通常被称为计数信号量(Counting semaphore),或一般信号量(general semaphore);如果信号量只有二进制的0或1,称为二进制信号量(binary semaphore)。在linux系统中,二进制信号量(binary semaphore)又称互斥锁(Mutex)

计数信号量具备两种操作动作,称为V(signal())与P(wait())(即部分参考书常称的“PV操作”)。V操作会增加信号标S的数值,P操作会减少它。

运行方式:

  1. 初始化信号量,给与它一个非负数的整数值。
  2. 运行P(wait()),信号标S的值将被减少。企图进入临界区的进程,需要先运行P(wait())。当信号标S减为负值时,进程会被阻塞住,不能继续;当信号标S不为负值时,进程可以获准进入临界区。
  3. 运行V(signal()),信号标S的值会被增加。结束离开临界区的进程,将会运行V(signal())。当信号标S不为负值时,先前被阻塞住的其他进程,将可获准进入临界区

我们一般用信号量保护一组资源,每次获取资源时都会将信号量中的计数器减去对应的数值,在释放时重新加回来。当遇到计数器大于信号量大小时就会进入休眠等待其他线程释放信号。如果信号量是只有0和1的二进位信号量,那么,它的 P/V 就和互斥锁的 Lock/Unlock 就一样了。

Go语言中的信号量表示

Go 内部使用信号量来控制goroutine的阻塞和唤醒,比如互斥锁sync.Mutex的内部实现

type Mutex struct {
    state int32
    sema  uint32
}

信号量的PV操作在Go内部是通过下面这几个底层函数实现的

func runtime_Semacquire(s *uint32)
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

上面几个函数都是Go语言内部使用的,不能在编程时直接使用。不过Go 语言的扩展并发原语包中提供了带权重的信号量 semaphore.Weighted

使用信号量semaphore.Weighted 前,需先在项目里安装golang.org/x/sync/

安装方法:go get -u golang.org/x/sync

我们可以按照不同的权重对资源的访问进行管理,这个结构体对外提供了四个方法:

  • semaphore.NewWeighted 用于创建新的信号量,通过参数(n int64) 指定信号量的初始值。
  • semaphore.Weighted.Acquire 阻塞地获取指定权重的资源,如果当前没有空闲资源,就会陷入休眠等待;相当于 P 操作,你可以一次获取多个资源,如果没有足够多的资源,调用者就会被阻塞。它的第一个参数是 Context,这就意味着,你可以通过 Context 增加超时或者 cancel 的机制。如果是正常获取了资源,就返回 nil;否则,就返回 ctx.Err(),信号量不改变。
  • semaphore.Weighted.Release 用于释放指定权重的资源;相当于 V 操作,可以将 n 个资源释放,返还给信号量。
  • semaphore.Weighted.TryAcquire 非阻塞地获取指定权重的资源,如果当前没有空闲资源,就会直接返回 false

在Go语言里使用信号量

在实际的场景中,我们应该怎么用呢?我来举个例子,来帮助你理解。假设我们有一组要抓取的页面,资源有限最多允许我们同时执行三个抓取任务,当同时有三个抓取任务在执行时,在执行完一个抓取任务后才能执行下一个排队等待的任务。当然这个问题用Channel也能解决,使用信号量原语来解决这个问题的代码如下:

package main

import (
    "context"
    "fmt"
    "sync"
    "time"

    "golang.org/x/sync/semaphore"
)

func doSomething(u string) {// 模拟抓取任务的执行
    fmt.Println(u)
    time.Sleep(2 * time.Second)
}

const (
    Limit  = 3 // 同時并行运行的goroutine上限
    Weight = 1 // 每个goroutine获取信号量资源的权重
)

func main() {
    urls := []string{
        "http://www.example.com",
        "http://www.example.net",
        "http://www.example.net/foo",
        "http://www.example.net/bar",
        "http://www.example.net/baz",
    }
    s := semaphore.NewWeighted(Limit)
    var w sync.WaitGroup
    for _, u := range urls {
        w.Add(1)
        go func(u string) {
            s.Acquire(context.Background(), Weight)
            doSomething(u)
            s.Release(Weight)
            w.Done()
        }(u)
    }
    w.Wait()
    
    fmt.Println("All Done")
}

kevinyan815 avatar Dec 08 '20 01:12 kevinyan815