gocookbook icon indicating copy to clipboard operation
gocookbook copied to clipboard

预防并发搞垮友军的几个方法

Open kevinyan815 opened this issue 3 years ago • 0 comments

巧用WaitGroup

因为go对并发的原生支持使得并发编程难度大大降低,刚学会Go语言的人特别喜欢在开发的时候尝试并发,其实并发并不是解决所有问题的银弹,反而是一味想尝试并发造成了不少线上BUG/事故。比如说,有的人会误以为起几十个线程休眠一下,再接着起线程就能控制并发数了,其实不是,比如像下面这么写

func badConcurrency() {
	batchSize := 50
	for {
		data, _ := queryDataWithSizeN(batchSize)
		if len(data) == 0 {
			break
		}

		for _, item := range data {
			go func(i int) {
				doSomething(i)
			}(item)
		}

		time.Sleep(time.Second * 1)
	}
}

对于调用者来说,看起来确实是控制了一秒中只发出去了50个请求,我们不能只从调用者的角度考虑事情,如果恰巧赶上对方正忙在,你程序休眠的时候下游服务并没有处理完你发过去的这批请求,这时你再发一批过去,累计下来无疑是对对方的服务器雪上加霜。最好的是等到上一批并发都返回了再去开启下一批,这个可以通过WaitGroup实现。

func useWaitGroup() {

	batchSize := 50
	for {
		data, _ := queryDataWithSizeN(batchSize)
		if len(data) == 0 {
			fmt.Println("End of all data")
			break
		}
		var wg sync.WaitGroup
		for _, item := range data {
			wg.Add(1)
			go func(i int) {
				doSomething(i)
				wg.Done()
			}(item)
		}
		wg.Wait()

		fmt.Println("Next bunch of data")
	}
}

巧用Semaphore

上面是一批处理完再开启下一批并发,也可以一个处理完后下一个补上,但同时发起的请求书最多不超过N个的限制,这个可以通过信号量实现。

func useSemaphore() {
	var concurrentNum int64 = 10
	var weight int64 = 1
	var batchSize int = 50
	s := semaphore.NewWeighted(concurrentNum)
	for {
		data, _ := queryDataWithSizeN(batchSize)
		if len(data) == 0 {
			fmt.Println("End of all data")
			break
		}

		for _, item := range data {
                        s.Acquire(context.Background(), weight)
			go func(i int) {
				doSomething(i)
				s.Release(weight)
			}(item)
		}

	}
}

使用限速器

再有就是使用限速器了,这个不像上面两个可以等到请求返回再开启下一个/一批,而是实实在在的限流,可以通过官方库提供的 time/rate 限流器实现。

func useRateLimit() {
	limiter := rate.NewLimiter(rate.Every(1*time.Second), 50)
	batchSize := 50
	for {
		data, _ :=queryDataWithSizeN(batchSize)
		if len(data) == 0 {
			fmt.Println("End of all data")
			break
		}

		for _, item := range data {
			// blocking until the bucket have sufficient token
			err := limiter.Wait(context.Background())
			if err != nil {
				fmt.Println("Error: ", err)
				return
			}
			go func(i int) {
				doSomething(i)
			}(item)
		}
	}
}

使用生产者消费者模式

利用channel实现一个生产者消费者队列模式,生产者从库里捞数据发送到通道,消费者通过通道接收数据做处理。

func useChannel() {
	batchSize := 50
	dataChan := make(chan int)
	var wg sync.WaitGroup
	wg.Add(batchSize + 1)
	// 生产者
	go func() {
		for {
			data, _ := queryDataWithSizeN(batchSize)
			if len(data) == 0 {
				break
			}
			for _, item := range data {
				dataChan <- item
			}
		}
		close(dataChan)
		wg.Done()
	}()
    // 消费者
	go func() {
		for i := 0; i < 50; i++ {
			go func() {
				for {
					select {
					case v, ok := <- dataChan:
						if !ok {
							wg.Done()
							return
						}
						doSomething(v)
					}
				}
			}()
		}
	}()

	wg.Wait()
}

完整代码参考:https://github.com/kevinyan815/gocookbook/blob/master/codes/prevent_over_concurrency/main.go

kevinyan815 avatar Jun 28 '21 10:06 kevinyan815