gocookbook icon indicating copy to clipboard operation
gocookbook copied to clipboard

限流算法--滑动窗口

Open kevinyan815 opened this issue 4 years ago • 0 comments

算法思想

滑动窗口算法将一个大的时间窗口分成多个小窗口,每次大窗口向后滑动一个小窗口,并保证大的窗口内流量不会超出最大值,这种实现比固定窗口的流量曲线更加平滑。

普通时间窗口有一个问题,比如窗口期内请求的上限是100,假设有100个请求集中在前1s的后100ms,100个请求集中在后1s的前100ms,其实在这200ms内就已经请求超限了,但是由于时间窗每经过1s就会重置计数,就无法识别到这种请求超限。

对于滑动时间窗口,我们可以把1ms的时间窗口划分成10个小窗口,或者想象窗口有10个时间插槽time slot, 每个time slot统计某个100ms的请求数量。每经过100ms,有一个新的time slot加入窗口,早于当前时间1s的time slot出窗口。窗口内最多维护10个time slot。

图片

面临的问题

滑动窗口算法是固定窗口的一种改进,但从根本上并没有真正解决固定窗口算法的临界突发流量问题

代码实现

主要就是实现滑动窗口算法,不过滑动窗口算法一般是找出数组中连续k个元素的最大值,这里是已知最大值n (就是请求上限)如果超过最大值就不予通过。

可以参考Bilibili开源的kratos框架里circuit breaker用循环列表保存time slot对象的实现,他们这个实现的好处是不用频繁的创建和销毁time slot对象。下面给出一个简单的基本实现:

package main

import (
	"fmt"
	"sync"
	"time"
)



type timeSlot struct {
	timestamp time.Time // 这个timeSlot的时间起点
	count     int       // 落在这个timeSlot内的请求数
}

// 统计整个时间窗口中已经发生的请求次数
func countReq(win []*timeSlot) int {
	var count int
	for _, ts := range win {
		count += ts.count
	}
	return count
}

type SlidingWindowLimiter struct {
	mu           sync.Mutex    // 互斥锁保护其他字段
	SlotDuration time.Duration // time slot的长度
	WinDuration  time.Duration // sliding window的长度
	numSlots     int           // window内最多有多少个slot
	windows      []*timeSlot
	maxReq       int // 大窗口时间内允许的最大请求数
}

func NewSliding(slotDuration time.Duration, winDuration time.Duration, maxReq int) *SlidingWindowLimiter {
	return &SlidingWindowLimiter{
		SlotDuration: slotDuration,
		WinDuration:  winDuration,
		numSlots:     int(winDuration / slotDuration),
		maxReq:       maxReq,
	}
}


func (l *SlidingWindowLimiter) validate() bool {
	l.mu.Lock()
	defer l.mu.Unlock()


	now := time.Now()
	// 已经过期的time slot移出时间窗
	timeoutOffset := -1
	for i, ts := range l.windows {
		if ts.timestamp.Add(l.WinDuration).After(now) {
			break
		}
		timeoutOffset = i
	}
	if timeoutOffset > -1 {
		l.windows = l.windows[timeoutOffset+1:]
	}

	// 判断请求是否超限
	var result bool
	if countReq(l.windows) < l.maxReq {
		result = true
	}

	// 记录这次的请求数
	var lastSlot *timeSlot
	if len(l.windows) > 0 {
		lastSlot = l.windows[len(l.windows)-1]
		if lastSlot.timestamp.Add(l.SlotDuration).Before(now) {
			// 如果当前时间已经超过这个时间插槽的跨度,那么新建一个时间插槽
			lastSlot = &timeSlot{timestamp: now, count: 1}
			l.windows = append(l.windows, lastSlot)
		} else {
			lastSlot.count++
		}
	} else {
		lastSlot = &timeSlot{timestamp: now, count: 1}
		l.windows = append(l.windows, lastSlot)
	}


	return result
}

func (l *SlidingWindowLimiter) LimitTest() string {
	if l.validate() {
		return "Accepted"
	} else {
		return "Ignored"
	}
}

func main() {
	limiter := NewSliding(100*time.Millisecond, time.Second, 10)
	for i := 0; i < 5; i++ {
		fmt.Println(limiter.LimitTest())
	}
	time.Sleep(100 * time.Millisecond)
	for i := 0; i < 5; i++ {
		fmt.Println(limiter.LimitTest())
	}
	fmt.Println(limiter.LimitTest())
	for _, v := range limiter.windows {
		fmt.Println(v.timestamp, v.count)
	}

	fmt.Println("moments later...")
	time.Sleep(time.Second)
	for i := 0; i < 7; i++ {
		fmt.Println(limiter.LimitTest())
	}
	for _, v := range limiter.windows {
		fmt.Println(v.timestamp, v.count)
	}
}

kevinyan815 avatar Dec 03 '20 07:12 kevinyan815