gocookbook
gocookbook copied to clipboard
限流算法--滑动窗口
算法思想
滑动窗口算法将一个大的时间窗口分成多个小窗口,每次大窗口向后滑动一个小窗口,并保证大的窗口内流量不会超出最大值,这种实现比固定窗口的流量曲线更加平滑。
普通时间窗口有一个问题,比如窗口期内请求的上限是100,假设有100个请求集中在前1s的后100ms,100个请求集中在后1s的前100ms,其实在这200ms内就已经请求超限了,但是由于时间窗每经过1s就会重置计数,就无法识别到这种请求超限。
对于滑动时间窗口,我们可以把1ms的时间窗口划分成10个小窗口,或者想象窗口有10个时间插槽time slot, 每个time slot统计某个100ms的请求数量。每经过100ms,有一个新的time slot加入窗口,早于当前时间1s的time slot出窗口。窗口内最多维护10个time slot。
面临的问题
滑动窗口算法是固定窗口的一种改进,但从根本上并没有真正解决固定窗口算法的临界突发流量问题
代码实现
主要就是实现滑动窗口算法,不过滑动窗口算法一般是找出数组中连续k个元素的最大值,这里是已知最大值n (就是请求上限)如果超过最大值就不予通过。
可以参考Bilibili开源的kratos框架里circuit breaker用循环列表保存time slot对象的实现,他们这个实现的好处是不用频繁的创建和销毁time slot对象。下面给出一个简单的基本实现:
package main
import (
"fmt"
"sync"
"time"
)
type timeSlot struct {
timestamp time.Time // 这个timeSlot的时间起点
count int // 落在这个timeSlot内的请求数
}
// 统计整个时间窗口中已经发生的请求次数
func countReq(win []*timeSlot) int {
var count int
for _, ts := range win {
count += ts.count
}
return count
}
type SlidingWindowLimiter struct {
mu sync.Mutex // 互斥锁保护其他字段
SlotDuration time.Duration // time slot的长度
WinDuration time.Duration // sliding window的长度
numSlots int // window内最多有多少个slot
windows []*timeSlot
maxReq int // 大窗口时间内允许的最大请求数
}
func NewSliding(slotDuration time.Duration, winDuration time.Duration, maxReq int) *SlidingWindowLimiter {
return &SlidingWindowLimiter{
SlotDuration: slotDuration,
WinDuration: winDuration,
numSlots: int(winDuration / slotDuration),
maxReq: maxReq,
}
}
func (l *SlidingWindowLimiter) validate() bool {
l.mu.Lock()
defer l.mu.Unlock()
now := time.Now()
// 已经过期的time slot移出时间窗
timeoutOffset := -1
for i, ts := range l.windows {
if ts.timestamp.Add(l.WinDuration).After(now) {
break
}
timeoutOffset = i
}
if timeoutOffset > -1 {
l.windows = l.windows[timeoutOffset+1:]
}
// 判断请求是否超限
var result bool
if countReq(l.windows) < l.maxReq {
result = true
}
// 记录这次的请求数
var lastSlot *timeSlot
if len(l.windows) > 0 {
lastSlot = l.windows[len(l.windows)-1]
if lastSlot.timestamp.Add(l.SlotDuration).Before(now) {
// 如果当前时间已经超过这个时间插槽的跨度,那么新建一个时间插槽
lastSlot = &timeSlot{timestamp: now, count: 1}
l.windows = append(l.windows, lastSlot)
} else {
lastSlot.count++
}
} else {
lastSlot = &timeSlot{timestamp: now, count: 1}
l.windows = append(l.windows, lastSlot)
}
return result
}
func (l *SlidingWindowLimiter) LimitTest() string {
if l.validate() {
return "Accepted"
} else {
return "Ignored"
}
}
func main() {
limiter := NewSliding(100*time.Millisecond, time.Second, 10)
for i := 0; i < 5; i++ {
fmt.Println(limiter.LimitTest())
}
time.Sleep(100 * time.Millisecond)
for i := 0; i < 5; i++ {
fmt.Println(limiter.LimitTest())
}
fmt.Println(limiter.LimitTest())
for _, v := range limiter.windows {
fmt.Println(v.timestamp, v.count)
}
fmt.Println("moments later...")
time.Sleep(time.Second)
for i := 0; i < 7; i++ {
fmt.Println(limiter.LimitTest())
}
for _, v := range limiter.windows {
fmt.Println(v.timestamp, v.count)
}
}