cinatra icon indicating copy to clipboard operation
cinatra copied to clipboard

限流功能

Open paimus opened this issue 3 months ago • 18 comments

  • 实际的业务场景是高并发推理场景,很容出现突发的大批请求,这个时候k8s根本来不及扩容的情况下,服务直接就不可用了。如果有限流能力,可以保证以现有的计算规模尽量的处理已经接收到请求。直到整个服务集群扩容完成。
  • 另外,发现以前就有这个实现的历史记录,https://github.com/qicosmos/cinatra/commit/0b1ee8b81e69bf334cce7fca8c5fc3dc2cb972ea#diff-772ec56bafe25b66135b2112ba101fff987d0766ffe98e5edde330c55842fc9e

paimus avatar Sep 09 '25 02:09 paimus

SmoothBursty 是我目前的业务场景,前面的历史实现,我还没有细看

paimus avatar Sep 09 '25 02:09 paimus

不过,在高并发的场景下,我们观测到的一个现象是,如果server端直接断开连接,反而会触发client端的重连机制,导致更大的请求和压力。所以在拒绝的时候,给一个应用层面的快捷返回,可能是更好的方式。

paimus avatar Sep 09 '25 02:09 paimus

client端也是cinatra吗

qicosmos avatar Sep 09 '25 02:09 qicosmos

client端也是cinatra吗

不是的,client的是java、golang等其他server程序

paimus avatar Sep 09 '25 03:09 paimus

client端也是cinatra吗

不是的,client的是java、golang等其他server程序

或者说,client的行为,基本上是没法约束的

paimus avatar Sep 09 '25 03:09 paimus

忽然想到另外一种方法,在accept的时候去做限制,如果达到某个上限了就立即关闭accept的连接,这种方式会不会更好?

qicosmos avatar Sep 09 '25 03:09 qicosmos

忽然想到另外一种方法,在accept的时候去做限制,如果达到某个上限了就立即关闭accept的连接,这种方式会不会更好?

结合我的经验,还是会引发风暴(本质上就是拒绝了client,只要拒绝换来的就是更多的请求重试,风暴就来了)。而且accept关闭,还会影响k8s的心跳检活。

paimus avatar Sep 09 '25 03:09 paimus

我目前的想法,尽量的满足的这个请求,不过server在达到限流的判定条件后,直接返回一个应用层的默认值(避免client来重试)。在这个过程中,k8s会扩容,后续压力就自然下来了

paimus avatar Sep 09 '25 03:09 paimus

我目前的想法,尽量的满足的这个请求,不过server在达到限流的判定条件后,直接返回一个应用层的默认值(避免client来重试)。在这个过程中,k8s会扩容,后续压力就自然下来了

那你在业务函数里去做不就好了,太繁忙了就不做业务,直接返回一个200之类的回去就好了。

qicosmos avatar Sep 09 '25 03:09 qicosmos

我目前的想法,尽量的满足的这个请求,不过server在达到限流的判定条件后,直接返回一个应用层的默认值(避免client来重试)。在这个过程中,k8s会扩容,后续压力就自然下来了

那你在业务函数里去做不就好了,太繁忙了就不做业务,直接返回一个200之类的回去就好了。

是的,我原来也是这么想的,统计一个qps,超过了就直接返回。就想着cinatra本身是否有原生支持或者更优雅的方法,所以社区里面问了问,如果框架本身有这个计划,我就follow

paimus avatar Sep 09 '25 03:09 paimus

巧的是,看到何老师也有实现这个feat,所以非常感兴趣

paimus avatar Sep 09 '25 03:09 paimus

按照我的实现方式,就需要先做一个统计,然后再判定再返回。如果框架有ratelimit,那就不一样了,做一个ratelimit设置,然后自定义一个默认的返回,就完事儿!

paimus avatar Sep 09 '25 03:09 paimus

@helintongh 你也说说想法,看看怎么整合起来。

qicosmos avatar Sep 09 '25 03:09 qicosmos

按照我的实现方式,就需要先做一个统计,然后再判定再返回。如果框架有ratelimit,那就不一样了,做一个ratelimit设置,然后自定义一个默认的返回,就完事儿!

per connection的令牌还是整体的令牌

helintongh avatar Sep 09 '25 08:09 helintongh

按照我的实现方式,就需要先做一个统计,然后再判定再返回。如果框架有ratelimit,那就不一样了,做一个ratelimit设置,然后自定义一个默认的返回,就完事儿!

per connection的令牌还是整体的令牌

@helintongh 整体的

paimus avatar Sep 10 '25 07:09 paimus

最终效果大概下面这样,提供两个接口一个allow立即返回给server用。一个wait会阻塞直到有新令牌满足此次请求所需令牌数(多数情况这个接口给client用)

package main

import (
	"context"
	"fmt"
	"time"

	"golang.org/x/time/rate"
)

func main() {
    // 创建一个限流器:
    // r: 每秒产生的令牌数 (Rate)
    // b: 桶的最大容量 (Burst size),即允许的瞬时最大突发流量
	limiter := rate.NewLimiter(10, 20) // 每秒10个,桶容量20个

	for i := 0; i < 25; i++ {
        // 方法1: Wait 会阻塞,直到有令牌可用(或上下文取消)
		err := limiter.Wait(context.Background())
		if err != nil {
			panic(err)
		}
		fmt.Printf("Request %d: %s\n", i, time.Now().Format("15:04:05.000"))
	}

    // 方法2: Allow 立即返回,判断当前是否有令牌可用
    if limiter.Allow() {
        fmt.Println("Allowed to process!")
    } else {
        fmt.Println("Rate limited!")
    }

    // 方法3: Reserve 预约一个未来的令牌,返回一个预约对象,可以告诉你需要等多久
    r := limiter.Reserve()
    if !r.OK() {
        // Not allowed to act! Did you remember to set limiter.burst > 0 ?
        return
    }
    time.Sleep(r.Delay()) // 等待建议的延迟时间
    fmt.Println("Reserved operation after delay")
}

helintongh avatar Sep 15 '25 02:09 helintongh

limiter := rate.NewLimiter(10, 20) 这行的意思是最大并发不能超过20是吗?

qicosmos avatar Sep 15 '25 03:09 qicosmos

limiter := rate.NewLimiter(10, 20) 这行的意思是最大并发不能超过20是吗?

是的,具体就是rlimiter需要有最大容量防止流量打崩服务器

helintongh avatar Sep 15 '25 04:09 helintongh