Blog icon indicating copy to clipboard operation
Blog copied to clipboard

限流

Open codcodog opened this issue 6 years ago • 1 comments

限流

问题触发的场景

在对接 IB 接口的时候,它们接口有一个严格的规定:

用户 1s 内的请求不能超过50个

在对接的时候,采用 Python协程 + 事件循环,也就是所谓的异步,每秒向 TWS 发送 50个请求,在请求 每天 级别的数据时,接口可以正常工作。

但,在请求 每5分钟 级别的数据时,接口不能正常工作:当请求达到一定的数量时就会触发接口的规定

ERROR:ib_insync.wrapper:Error 100, reqId 20: Max rate of messages per second has been exceeded:max=50 rec=85 (3)

很好奇 IB 是如何实现用户 1s 内的请求不能超过50个的?

问题扩展

在高并发系统下,有三把利器:缓存降级限流

  • 缓存

将常用数据缓存起来,减少数据库或者磁盘 IO

  • 降级

保护核心系统,降低非核心业务请求响应

  • 限流

在某一个时间窗口内对请求进行限速,保护系统

这里主要针对限流进行介绍,常见的限流算法有:

  • 计数器算法
  • 漏桶算法
  • 令牌桶算法

计数器算法

计数器法 就是为客户端的请求设置一个计数器 Counter, 记录客户端的请求量(QPS)

对于 1s 内用户的请求不能超过50个,可以这样简单实现:
一开始,我们为用户设置一个计数器 Counter,每当一个请求过来的时候,Counter 加1.
如果该请求与第一个请求的间隔时间在1s之内,但请求数 Counter 大于50的时候,则说明用户 1s 内的请求数过多了.
如果该请求与第一个请求的间隔时间大于1s,则重置 Counter.

代码实现

import time

class Counter:
    def __init__(self):
        ''' 初始化计数器
        @param  limit   单位时间请求最大值
        @param  interval 单位时间:秒
        '''
        self.req       = 0
        self.limit     = 50
        self.interval  = 1
        self.timestamp = time.time()

    def is_out_rate(self):
        ''' 是否超过限制
        '''
        now = time.time()
        delta = now - self.timestamp

        if delta <= self.interval:
            self.req += 1

            return self.req > self.limit
        else:
            # 超时重置
            self.timestamp = now
            self.req = 1

            return False

存在问题

计数器法 存在临界问题:
设置 1s 内用户的请求不超过50个,如果将粒度设置为 ms,则期望用户的请求:0.05 r/ms

~~假设有一个恶意用户,在 1s 瞬间发送了50个请求,在 1.001s 又瞬间发送了50个请求,则在 0.001s(1ms) 发送了100个请求,也就是 100 r/ms~~
假设有一个恶意用户,在0s发送了1个请求,在1s瞬间发送了49个请求,在 1.001s 又瞬间发送了50个请求,则在 0.001s(1ms) 发送了99个请求,也就是 99 r/ms 这样利用时间窗口的重置节点突发请求,就可以瞬间突破我们的速率限制.

对于设置秒级的Counter 来说,其实精度已经很高了,这里是为了说明需要,采用毫秒进行举例
日常开发中,如果是分钟,小时这种低精度的话,则需要考虑临界问题

临界问题 参考:滑动窗口

漏桶算法

漏桶算法:水(请求)流入到漏桶里,漏桶以一定的速度流出,当水流入速度过大则会直接溢出.

漏桶算法 天生就限制了请求的速度,并且不会存在临界问题.

081225378155003

首先有一个固定容量的桶,有水(请求)流进来和流出去。对于水(请求)流进来的速度不做限制,但固定水(请求)流出的速率,当桶满了之后,多余的水(请求)则溢出(拒绝/排队)。

代码实现

import time

class LeackyBucket:
    def __init__(self, capacity, out_rate):
        ''' 初始化漏桶
        @param  capacity    桶的容量
        @param  out_rate    流出的速率
        @param  water   当前的水的容量
        '''
        self.capacity  = float(capacity)
        self.out_rate  = float(out_rate)
        self._water    = 0
        self.timestamp = time.time()

    def consume(self, water):
        ''' 往漏桶加水
        '''
        if self._water + water > self.capacity:
            return False

        self._water += water
        return True

    @property
    def water(self):
        ''' 查看当前漏桶中的水量
        '''
        if self._water < self.capacity:
            now = time.time()
            delta = self.out_rate * (now - self.timestamp)
            self._water = self._water - delta
            self.timestamp = now

            return self._water
        else:
            return self.capacity


if __name__ == '__main__':
    bucket = LeackyBucket(50, 50)
    print('漏桶的容量:%.2f,当前桶的水量:%.2f,流出的速度是:%.2f r/s' % (bucket.capacity, bucket.water, bucket.out_rate))
    print('往桶加入50水量:%s' % bucket.consume(50))
    print('当前桶的水量:%.2f' % bucket.water)
    time.sleep(0.1)
    print('约0.1s之后,桶的水量是:%.2f' % bucket.water)
    time.sleep(0.2)
    print('约0.3s之后,桶的水量是:%.2f' % bucket.water)
    print('往桶加入40水量:%s' % bucket.consume(40))
[cryven@codcodog ~]$ python leaky_buckey.py 
漏桶的容量:50.00,当前桶的水量:-0.00,流出的速度是:50.00 r/s
往桶加入50水量:True
当前桶的水量:50.00
约0.1s之后,桶的水量是:44.99
约0.3s之后,桶的水量是:34.97
往桶加入40水量:False

令牌桶算法

令牌桶算法:系统以一个恒定的速度往桶里放入令牌,如果请求需要被处理,则需要先从桶里获取一个令牌,若桶里没有令牌时,则拒绝服务.

081226107372877

有一个固定容量的桶,用来存放令牌(token)。一开始,桶是空的,令牌(token)以固定的速率 R 往桶里填充,如果桶满了,则多余的令牌被丢弃。
当一个请求过来的时候,首先从桶里获取令牌(token),获取成功后,请求才会被接受;若获取失败,则请求被拒绝,或者排队等待,直到获取令牌(token)成功。

代码实现

import time

class TokenBucket:
    def __init__(self, tokens, fill_rate):
        ''' 初始化令牌桶
        @param  tokens  令牌桶的大小
        @param  fill_rate   填充令牌的速度,tokens/second
        '''
        self.capacity  = float(tokens)
        self._tokens   = float(tokens)
        self.fill_rate = float(fill_rate)
        self.timestamp = time.time()

    def consume(self, tokens):
        ''' 消费令牌桶中的令牌
        桶中有足够的令牌则返回 True
        否则返回 False
        '''
        if tokens <= self._tokens:
            self._tokens -= tokens
            return True

        return False

    @property
    def tokens(self):
        ''' 获取 bucket 的 token
        '''
        if self._tokens <= self.capacity:
            now = time.time()
            delta = self.fill_rate * (now - self.timestamp)
            self._tokens = min(self.capacity, self._tokens + delta)
            self.timestamp = now

            return self._tokens


if __name__ == '__main__':
    bucket = TokenBucket(50, 50)
    print('当前桶中的tokens:%.2f,填充速度是:%.2f t/s' % (bucket.tokens, bucket.fill_rate))
    print('消耗30个token:%s,当前tokens:%.2f' % (bucket.consume(30), bucket.tokens))
    time.sleep(0.1)
    print('约0.1秒后当前桶中的tokens:%.2f' % bucket.tokens)
    time.sleep(0.2)
    print('约0.3秒后当前桶中的tokens:%.2f' % bucket.tokens)
    print('消耗60个token:%s' % bucket.consume(60))
    print('当前桶中的tokens:%.2f' % bucket.tokens)
[cryven@codcodog ~]$ python token_bucket.py 
当前桶中的tokens:50.00,填充速度是:50.00 t/s
消耗30个token:True,当前tokens:20.00
约0.1秒后当前桶中的tokens:25.01
约0.3秒后当前桶中的tokens:35.03
消耗60个token:False
当前桶中的tokens:35.04

代码来源:IMPLEMENTATION OF THE TOKEN BUCKET ALGORITHM

总结

计数器 是最简单粗暴的,但当精度低的时候会存在 临界问题,可以使用 滑动窗口 进行调整.

漏桶算法令牌桶算法 最大的区别就是 令牌桶算法 允许流量一定程度的突发:如果桶内有50个Token时,那么可以瞬间允许50个请求通过,而 漏桶算法 无法做到.

最后,咨询和了解了下,感觉不是我们请求频率的问题,猜测 IB 做了类似反爬虫的策略,毕竟2800多个股短时间内请求分钟颗粒的数据,估计当我们请求的量达到一定的程度时,IB 就开始拒绝我们的请求了,毕竟它不是专业的数据提供商.

参考文章
接口限流算法总结

codcodog avatar Jan 23 '18 15:01 codcodog

假设有一个恶意用户,在 1s 瞬间发送了50个请求,在 1.001s 又瞬间发送了50个请求,则在 0.001s(1ms) 发送了100个请求,也就是 100 r/ms

应该是按第一次请求开始算时间吧?比如说:0s 的时候发送第一次请求,在0.999s 的时候发送剩余49次。

JChehe avatar Jan 24 '18 02:01 JChehe