Blog
Blog copied to clipboard
限流
限流
问题触发的场景
在对接 IB
接口的时候,它们接口有一个严格的规定:
用户 1s 内的请求不能超过50个
在对接的时候,采用 Python
的 协程
+ 事件循环
,也就是所谓的异步
,每秒向 TWS
发送 50个请求,在请求 每天
级别的数据时,接口可以正常工作。
但,在请求 每5分钟
级别的数据时,接口不能正常工作:当请求达到一定的数量时就会触发接口的规定
ERROR:ib_insync.wrapper:Error 100, reqId 20: Max rate of messages per second has been exceeded:max=50 rec=85 (3)
很好奇 IB
是如何实现用户 1s 内的请求不能超过50个的?
问题扩展
在高并发系统下,有三把利器:缓存,降级,限流
- 缓存
将常用数据缓存起来,减少数据库或者磁盘 IO
- 降级
保护核心系统,降低非核心业务请求响应
- 限流
在某一个时间窗口内对请求进行限速,保护系统
这里主要针对限流进行介绍,常见的限流算法有:
- 计数器算法
- 漏桶算法
- 令牌桶算法
计数器算法
计数器法
就是为客户端的请求设置一个计数器 Counter
, 记录客户端的请求量(QPS)
对于 1s 内用户的请求不能超过50个,可以这样简单实现:
一开始,我们为用户设置一个计数器 Counter
,每当一个请求过来的时候,Counter
加1.
如果该请求与第一个请求的间隔时间在1s之内,但请求数 Counter
大于50的时候,则说明用户 1s 内的请求数过多了.
如果该请求与第一个请求的间隔时间大于1s,则重置 Counter
.
代码实现
import time
class Counter:
def __init__(self):
''' 初始化计数器
@param limit 单位时间请求最大值
@param interval 单位时间:秒
'''
self.req = 0
self.limit = 50
self.interval = 1
self.timestamp = time.time()
def is_out_rate(self):
''' 是否超过限制
'''
now = time.time()
delta = now - self.timestamp
if delta <= self.interval:
self.req += 1
return self.req > self.limit
else:
# 超时重置
self.timestamp = now
self.req = 1
return False
存在问题
计数器法
存在临界问题:
设置 1s 内用户的请求不超过50个,如果将粒度设置为 ms
,则期望用户的请求:0.05 r/ms
~~假设有一个恶意用户,在 1s 瞬间发送了50个请求,在 1.001s 又瞬间发送了50个请求,则在 0.001s(1ms) 发送了100个请求,也就是 100 r/ms~~
假设有一个恶意用户,在0s发送了1个请求,在1s瞬间发送了49个请求,在 1.001s 又瞬间发送了50个请求,则在 0.001s(1ms) 发送了99个请求,也就是 99 r/ms
这样利用时间窗口的重置节点突发请求,就可以瞬间突破我们的速率限制.
对于设置秒级的
Counter
来说,其实精度已经很高了,这里是为了说明需要,采用毫秒进行举例
日常开发中,如果是分钟,小时这种低精度的话,则需要考虑临界问题
临界问题
参考:滑动窗口
漏桶算法
漏桶算法
:水(请求)流入到漏桶里,漏桶以一定的速度流出,当水流入速度过大则会直接溢出.
漏桶算法 天生就限制了请求的速度,并且不会存在临界问题.
首先有一个固定容量的桶,有水(请求)流进来和流出去。对于水(请求)流进来的速度不做限制,但固定水(请求)流出的速率,当桶满了之后,多余的水(请求)则溢出(拒绝/排队)。
代码实现
import time
class LeackyBucket:
def __init__(self, capacity, out_rate):
''' 初始化漏桶
@param capacity 桶的容量
@param out_rate 流出的速率
@param water 当前的水的容量
'''
self.capacity = float(capacity)
self.out_rate = float(out_rate)
self._water = 0
self.timestamp = time.time()
def consume(self, water):
''' 往漏桶加水
'''
if self._water + water > self.capacity:
return False
self._water += water
return True
@property
def water(self):
''' 查看当前漏桶中的水量
'''
if self._water < self.capacity:
now = time.time()
delta = self.out_rate * (now - self.timestamp)
self._water = self._water - delta
self.timestamp = now
return self._water
else:
return self.capacity
if __name__ == '__main__':
bucket = LeackyBucket(50, 50)
print('漏桶的容量:%.2f,当前桶的水量:%.2f,流出的速度是:%.2f r/s' % (bucket.capacity, bucket.water, bucket.out_rate))
print('往桶加入50水量:%s' % bucket.consume(50))
print('当前桶的水量:%.2f' % bucket.water)
time.sleep(0.1)
print('约0.1s之后,桶的水量是:%.2f' % bucket.water)
time.sleep(0.2)
print('约0.3s之后,桶的水量是:%.2f' % bucket.water)
print('往桶加入40水量:%s' % bucket.consume(40))
[cryven@codcodog ~]$ python leaky_buckey.py
漏桶的容量:50.00,当前桶的水量:-0.00,流出的速度是:50.00 r/s
往桶加入50水量:True
当前桶的水量:50.00
约0.1s之后,桶的水量是:44.99
约0.3s之后,桶的水量是:34.97
往桶加入40水量:False
令牌桶算法
令牌桶算法
:系统以一个恒定的速度往桶里放入令牌,如果请求需要被处理,则需要先从桶里获取一个令牌,若桶里没有令牌时,则拒绝服务.
有一个固定容量的桶,用来存放令牌(token)。一开始,桶是空的,令牌(token)以固定的速率 R 往桶里填充,如果桶满了,则多余的令牌被丢弃。
当一个请求过来的时候,首先从桶里获取令牌(token),获取成功后,请求才会被接受;若获取失败,则请求被拒绝,或者排队等待,直到获取令牌(token)成功。
代码实现
import time
class TokenBucket:
def __init__(self, tokens, fill_rate):
''' 初始化令牌桶
@param tokens 令牌桶的大小
@param fill_rate 填充令牌的速度,tokens/second
'''
self.capacity = float(tokens)
self._tokens = float(tokens)
self.fill_rate = float(fill_rate)
self.timestamp = time.time()
def consume(self, tokens):
''' 消费令牌桶中的令牌
桶中有足够的令牌则返回 True
否则返回 False
'''
if tokens <= self._tokens:
self._tokens -= tokens
return True
return False
@property
def tokens(self):
''' 获取 bucket 的 token
'''
if self._tokens <= self.capacity:
now = time.time()
delta = self.fill_rate * (now - self.timestamp)
self._tokens = min(self.capacity, self._tokens + delta)
self.timestamp = now
return self._tokens
if __name__ == '__main__':
bucket = TokenBucket(50, 50)
print('当前桶中的tokens:%.2f,填充速度是:%.2f t/s' % (bucket.tokens, bucket.fill_rate))
print('消耗30个token:%s,当前tokens:%.2f' % (bucket.consume(30), bucket.tokens))
time.sleep(0.1)
print('约0.1秒后当前桶中的tokens:%.2f' % bucket.tokens)
time.sleep(0.2)
print('约0.3秒后当前桶中的tokens:%.2f' % bucket.tokens)
print('消耗60个token:%s' % bucket.consume(60))
print('当前桶中的tokens:%.2f' % bucket.tokens)
[cryven@codcodog ~]$ python token_bucket.py
当前桶中的tokens:50.00,填充速度是:50.00 t/s
消耗30个token:True,当前tokens:20.00
约0.1秒后当前桶中的tokens:25.01
约0.3秒后当前桶中的tokens:35.03
消耗60个token:False
当前桶中的tokens:35.04
代码来源:IMPLEMENTATION OF THE TOKEN BUCKET ALGORITHM
总结
计数器
是最简单粗暴的,但当精度低的时候会存在 临界问题
,可以使用 滑动窗口
进行调整.
漏桶算法
和 令牌桶算法
最大的区别就是 令牌桶算法
允许流量一定程度的突发:如果桶内有50个Token时,那么可以瞬间允许50个请求通过,而 漏桶算法
无法做到.
最后,咨询和了解了下,感觉不是我们请求频率的问题,猜测 IB
做了类似反爬虫的策略,毕竟2800多个股短时间内请求分钟颗粒的数据,估计当我们请求的量达到一定的程度时,IB
就开始拒绝我们的请求了,毕竟它不是专业的数据提供商.
参考文章
接口限流算法总结
假设有一个恶意用户,在 1s 瞬间发送了50个请求,在 1.001s 又瞬间发送了50个请求,则在 0.001s(1ms) 发送了100个请求,也就是 100 r/ms
应该是按第一次请求开始算时间吧?比如说:0s 的时候发送第一次请求,在0.999s 的时候发送剩余49次。