blog icon indicating copy to clipboard operation
blog copied to clipboard

Go经典入门22:缓冲信道和工作池

Open kaindy7633 opened this issue 3 years ago • 0 comments

Go经典入门22:缓冲信道和工作池

什么是缓冲信道?

在上一教程里,我们讨论的主要是无缓冲信道。我们在 Channel 的教程里详细讨论了,无缓冲信道的发送和接收过程是阻塞的。

我们还可以创建一个有缓冲(Buffer)的信道。只在缓冲已满的情况,才会阻塞向缓冲信道(Buffered Channel)发送数据。同样,只有在缓冲为空的时候,才会阻塞从缓冲信道接收数据。

通过向 make 函数再传递一个表示容量的参数(指定缓冲的大小),可以创建缓冲信道。

ch := make(chan type, capacity)

要让一个信道有缓冲,上面语法中的 capacity 应该大于 0。无缓冲信道的容量默认为 0,因此我们在上一教程创建信道时,省略了容量参数。

我们开始编写代码,创建一个缓冲信道。

示例一

package main

import (
    "fmt"
)

func main() {
    ch := make(chan string, 2)  // 缓冲信道 ,容量2
    ch <- "naveen"
    ch <- "paul"
    fmt.Println(<- ch)
    fmt.Println(<- ch)
}

在上面程序里的第 9 行,我们创建了一个缓冲信道,其容量为 2。由于该信道的容量为 2,因此可向它写入两个字符串,而且不会发生阻塞。在第 10 行和第 11 行,我们向信道写入两个字符串,该信道并没有发生阻塞。我们又在第 12 行和第 13 行分别读取了这两个字符串。该程序输出:

// naveen
// paul

示例二

我们再看一个缓冲信道的示例,其中有一个并发的 Go 协程来向信道写入数据,而 Go 主协程负责读取数据。该示例帮助我们进一步理解,在向缓冲信道写入数据时,什么时候会发生阻塞。

package main

import (
    "fmt"
    "time"
)

func write(ch chan int) {
    for i := 0; i < 5; i++ {
        ch <- i
        fmt.Println("successfully wrote", i, "to ch")
    }
    close(ch)
}

func main() {
    ch := make(chan int, 2)
    go write(ch)
    time.Sleep(2 * time.Second)
    for v := range ch {
        fmt.Println("read value", v,"from ch")
        time.Sleep(2 * time.Second)
    }
}

在上面的程序中,第 16 行在 Go 主协程中创建了容量为 2 的缓冲信道 ch,而第 17 行把 ch 传递给了 write 协程。接下来 Go 主协程休眠了两秒。在这期间,write 协程在并发地运行。write 协程有一个 for 循环,依次向信道 ch 写入 0~4。而缓冲信道的容量为 2,因此 write 协程里立即会向 ch 写入 0 和 1,接下来发生阻塞,直到 ch 内的值被读取。因此,该程序立即打印出下面两行:

// successfully wrote 0 to ch
// successfully wrote 1 to ch

打印上面两行之后,write 协程中向 ch 的写入发生了阻塞,直到 ch 有值被读取到。而 Go 主协程休眠了两秒后,才开始读取该信道,因此在休眠期间程序不会打印任何结果。主协程结束休眠后,在第 19 行使用 for range 循环,开始读取信道 ch,打印出了读取到的值后又休眠两秒,这个循环一直到 ch 关闭才结束。所以该程序在两秒后会打印下面两行:

// read value 0 from ch
// successfully wrote 2 to ch

该过程会一直进行,直到信道读取完所有的值,并在 write 协程中关闭信道。最终输出如下:

// successfully wrote 0 to ch
// successfully wrote 1 to ch
// read value 0 from ch
// successfully wrote 2 to ch
// read value 1 from ch
// successfully wrote 3 to ch
// read value 2 from ch
// successfully wrote 4 to ch
// read value 3 from ch
// read value 4 from ch

死锁

package main

import (
    "fmt"
)

func main() {
    ch := make(chan string, 2)
    ch <- "naveen"
    ch <- "paul"
    ch <- "steve"
    fmt.Println(<-ch)
    fmt.Println(<-ch)
}

在上面程序里,我们向容量为 2 的缓冲信道写入 3 个字符串。当在程序控制到达第 3 次写入时(第 11 行),由于它超出了信道的容量,因此这次写入发生了阻塞。现在想要这次写操作能够进行下去,必须要有其它协程来读取这个信道的数据。但在本例中,并没有并发协程来读取这个信道,因此这里会发生死锁(deadlock)。程序会在运行时触发 panic,信息如下:

// fatal error: all goroutines are asleep - deadlock!
// 
// goroutine 1 [chan send]:
// main.main()
//  /tmp/sandbox274756028/main.go:11 +0x100

长度 vs 容量

缓冲信道的容量是指信道可以存储的值的数量。我们在使用 make 函数创建缓冲信道的时候会指定容量大小。

缓冲信道的长度是指信道中当前排队的元素个数。

代码可以把一切解释得很清楚。:)

package main

import (
    "fmt"
)

func main() {
    ch := make(chan string, 3)
    ch <- "naveen"
    ch <- "paul"
    fmt.Println("capacity is", cap(ch))
    fmt.Println("length is", len(ch))
    fmt.Println("read value", <-ch)
    fmt.Println("new length is", len(ch))
}

在上面的程序里,我们创建了一个容量为 3 的信道,于是它可以保存 3 个字符串。接下来,我们分别在第 9 行和第 10 行向信道写入了两个字符串。于是信道有两个字符串排队,因此其长度为 2。在第 13 行,我们又从信道读取了一个字符串。现在该信道内只有一个字符串,因此其长度变为 1。该程序会输出:

// capacity is 3
// length is 2
// read value naveen
// new length is 1

WaitGroup

在本教程的下一节里,我们会讲到工作池(Worker Pools)。而 WaitGroup 用于实现工作池,因此要理解工作池,我们首先需要学习 WaitGroup

WaitGroup 用于等待一批 Go 协程执行结束。程序控制会一直阻塞,直到这些协程全部执行完毕。假设我们有 3 个并发执行的 Go 协程(由 Go 主协程生成)。Go 主协程需要等待这 3 个协程执行结束后,才会终止。这就可以用 WaitGroup 来实现。

理论说完了,我们编写点儿代码吧。:)

package main

import (
    "fmt"
    "sync"
    "time"
)

func process(i int, wg *sync.WaitGroup) {
    fmt.Println("started Goroutine ", i)
    time.Sleep(2 * time.Second)
    fmt.Printf("Goroutine %d ended\n", i)
    wg.Done()
}

func main() {
    no := 3
    var wg sync.WaitGroup
    for i := 0; i < no; i++ {
        wg.Add(1)
        go process(i, &wg)
    }
    wg.Wait()
    fmt.Println("All go routines finished executing")
}

WaitGroup 是一个结构体类型,我们在第 18 行创建了 WaitGroup 类型的变量,其初始值为零值。WaitGroup 使用计数器来工作。当我们调用 WaitGroupAdd 并传递一个 int 时,WaitGroup 的计数器会加上 Add 的传参。要减少计数器,可以调用 WaitGroupDone() 方法。Wait() 方法会阻塞调用它的 Go 协程,直到计数器变为 0 后才会停止阻塞。

上述程序里,for 循环迭代了 3 次,我们在循环内调用了 wg.Add(1)(第 20 行)。因此计数器变为 3。for 循环同样创建了 3 个 process 协程,然后在第 23 行调用了 wg.Wait(),确保 Go 主协程等待计数器变为 0。在第 13 行,process 协程内调用了 wg.Done,可以让计数器递减。一旦 3 个子协程都执行完毕(即 wg.Done() 调用了 3 次),那么计数器就变为 0,于是主协程会解除阻塞。

在第 21 行里,传递 wg 的地址是很重要的。如果没有传递 wg 的地址,那么每个 Go 协程将会得到一个 WaitGroup 值的拷贝,因而当它们执行结束时,main 函数并不会知道。

该程序输出:

// started Goroutine  2
// started Goroutine  0
// started Goroutine  1
// Goroutine 0 ended
// Goroutine 2 ended
// Goroutine 1 ended
// All go routines finished executing

由于 Go 协程的执行顺序不一定,因此你的输出可能和我不一样。:)

工作池的实现

缓冲信道的重要应用之一就是实现工作池。

一般而言,工作池就是一组等待任务分配的线程。一旦完成了所分配的任务,这些线程可继续等待任务的分配。

我们会使用缓冲信道来实现工作池。我们工作池的任务是计算所输入数字的每一位的和。例如,如果输入 234,结果会是 9(即 2 + 3 + 4)。向工作池输入的是一列伪随机数。

我们工作池的核心功能如下:

  • 创建一个 Go 协程池,监听一个等待作业分配的输入型缓冲信道。
  • 将作业添加到该输入型缓冲信道中。
  • 作业完成后,再将结果写入一个输出型缓冲信道。
  • 从输出型缓冲信道读取并打印结果。

我们会逐步编写这个程序,让代码易于理解。

第一步就是创建一个结构体,表示作业和结果。

type Job struct {
    id       int
    randomno int
}

type Result struct {
    job         Job
    sumofdigits int
}

所有 Job 结构体变量都会有 idrandomno 两个字段,randomno 用于计算其每位数之和。

Result 结构体有一个 job 字段,表示所对应的作业,还有一个 sumofdigits 字段,表示计算的结果(每位数字之和)。

第二步是分别创建用于接收作业和写入结果的缓冲信道。

var jobs = make(chan Job, 10)
var results = make(chan Result, 10)

工作协程(Worker Goroutine)会监听缓冲信道 jobs 里更新的作业。一旦工作协程完成了作业,其结果会写入缓冲信道 results

如下所示,digits 函数的任务实际上就是计算整数的每一位之和,最后返回该结果。为了模拟出 digits 在计算过程中花费了一段时间,我们在函数内添加了两秒的休眠时间。

func digits(number int) int {
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }

    time.Sleep(2 * time.Second)
    return sum
}

然后,我们写一个创建工作协程的函数。

func worker(wg *sync.WaitGroup) {
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}

上面的函数创建了一个工作者(Worker),读取 jobs 信道的数据,根据当前的 jobdigits 函数的返回值,创建了一个 Result 结构体变量,然后将结果写入 results 缓冲信道。worker 函数接收了一个 WaitGroup 类型的 wg 作为参数,当所有的 jobs 完成的时候,调用了 Done() 方法。

createWorkerPool 函数创建了一个 Go 协程的工作池。

func createWorkerPool(noOfWorkers int) {
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}

上面函数的参数是需要创建的工作协程的数量。在创建 Go 协程之前,它调用了 wg.Add(1) 方法,于是 WaitGroup 计数器递增。接下来,我们创建工作协程,并向 worker 函数传递 wg 的地址。创建了需要的工作协程后,函数调用 wg.Wait(),等待所有的 Go 协程执行完毕。所有协程完成执行之后,函数会关闭 results 信道。因为所有协程都已经执行完毕,于是不再需要向 results 信道写入数据了。

现在我们已经有了工作池,我们继续编写一个函数,把作业分配给工作者。

func allocate(noOfJobs int) {
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}

上面的 allocate 函数接收所需创建的作业数量作为输入参数,生成了最大值为 998 的伪随机数,并使用该随机数创建了 Job 结构体变量。这个函数把 for 循环的计数器 i 作为 id,最后把创建的结构体变量写入 jobs 信道。当写入所有的 job 时,它关闭了 jobs 信道。

下一步是创建一个读取 results 信道和打印输出的函数。

func result(done chan bool) {
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}

result 函数读取 results 信道,并打印出 jobid、输入的随机数、该随机数的每位数之和。result 函数也接受 done 信道作为参数,当打印所有结果时,done 会被写入 true

现在一切准备充分了。我们继续完成最后一步,在 main() 函数中调用上面所有的函数。

func main() {
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

我们首先在 main 函数的第 2 行,保存了程序的起始时间,并在最后一行(第 12 行)计算了 endTimestartTime 的差值,显示出程序运行的总时间。由于我们想要通过改变协程数量,来做一点基准指标(Benchmark),所以需要这么做。

我们把 noOfJobs 设置为 100,接下来调用了 allocate,向 jobs 信道添加作业。

我们创建了 done 信道,并将其传递给 result 协程。于是该协程会开始打印结果,并在完成打印时发出通知。

通过调用 createWorkerPool 函数,我们最终创建了一个有 10 个协程的工作池。main 函数会监听 done 信道的通知,等待所有结果打印结束。

为了便于参考,下面是整个程序。我还引用了必要的包。

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Job struct {
    id       int
    randomno int
}

type Result struct {
    job         Job
    sumofdigits int
}

var jobs = make(chan Job, 10)
var results = make(chan Result, 10)

func digits(number int) int {
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}

func worker(wg *sync.WaitGroup) {
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}

func createWorkerPool(noOfWorkers int) {
 var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}

func allocate(noOfJobs int) {
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}

func result(done chan bool) {
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}

func main() {
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

为了更精确地计算总时间,请在你的本地机器上运行该程序。

该程序输出:

// Job id 1, input random no 636, sum of digits 15
// Job id 0, input random no 878, sum of digits 23
// Job id 9, input random no 150, sum of digits 6
// ...
// total time taken  20.01081009 seconds

程序总共会打印 100 行,对应着 100 项作业,然后最后会打印一行程序消耗的总时间。你的输出会和我的不同,因为 Go 协程的运行顺序不一定,同样总时间也会因为硬件而不同。在我的例子中,运行程序大约花费了 20 秒。

现在我们把 main 函数里的 noOfWorkers 增加到 20。我们把工作者的数量加倍了。由于工作协程增加了(准确说来是两倍),因此程序花费的总时间会减少(准确说来是一半)。在我的例子里,程序会打印出 10.004364685 秒。

// ...
// total time taken  10.004364685 seconds

现在我们可以理解了,随着工作协程数量增加,完成作业的总时间会减少。你们可以练习一下:在 main 函数里修改 noOfJobsnoOfWorkers 的值,并试着去分析一下结果。


下一节:Go 经典入门 23:Select

kaindy7633 avatar Mar 15 '21 12:03 kaindy7633