介绍
前面我们学习了漏斗限流和令牌桶限流,那么你有没有觉得这两种算法存在一些问题呢?🤔一下
这两种算法存在的最大问题就是都需要提前设定阈值。那阈值如何设定呢?
比如常见的基于QPS来设定限流的阈值如何做? 一般我们通过压测来决定该值。
两种算法都存在以下一些缺点:
- 上线前进行严格压测
- 如果是测试环境压测,那和线上环境存在很大差异
- 机器配置和数量存在差异导致阈值不准确
- 依赖人工手动调整
那存在这么多的缺点,有没有什么好办法呢?
答案是有的,就是我们今天的主角: 自适应限流
自适应限流
自适应限流怎么做
前面我们遇到的主要问题就是每个服务实例的限流阈值实际应该是动态变化的,我们应该根据系统能够承载的最大吞吐量,来进行限流,当当前的流量大于最大吞吐的时候就限制流量进入,反之则允许通过。那现在的问题就是
- 系统的吞吐量该如何计算?
- 什么时候系统的吞吐量就是最大的吞吐量了?
对于自适应限流来说, 一般都是结合系统的 Load、CPU 使用率以及应用的入口 QPS、平均响应时间和并发量等几个维度的监控指标,通过自适应的流控策略, 让系统的入口流量和系统的负载达到一个平衡,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性
本框架的自适应限流主要是: 通过综合分析服务的 cpu 使用率、请求成功的 qps 和请求成功的 rt 来做自适应限流保护。
源自 Kratos
先来说几个名字:
- cpu: 最近 1s 的 CPU 使用率均值,使用滑动平均计算,采样周期是 250ms
- inflight: 当前处理中正在处理的请求数量
- pass: 请求处理成功的量
- rt: 请求成功的响应耗时
限流公式
cpu > 800 AND (Now - PrevDrop) < 1s AND (MaxPass MinRt windows / 1000) < InFlight
- MaxPass 表示最近 5s 内,单个采样窗口中最大的请求数
- MinRt 表示最近 5s 内,单个采样窗口中最小的响应时间
- windows 表示一秒内采样窗口的数量,默认配置中是 5s 50 个采样,那么 windows 的值为 10
中间件实现
func (b *RateLimiter) Limit() HandlerFunc {
return func(c *Context) {
uri := fmt.Sprintf("%s://%s%s", c.Request.URL.Scheme, c.Request.Host, c.Request.URL.Path)
limiter := b.group.Get(uri)
done, err := limiter.Allow(c)
if err != nil {
_metricServerBBR.Inc(uri, c.Request.Method)
c.JSON(nil, err)
c.Abort()
return
}
defer func() {
done(limit.DoneInfo{Op: limit.Success})
b.printStats(uri, limiter)
}()
c.Next()
}
}
使用方式
g := gin.New()
limiter := NewRateLimiter(nil)
g.Use(limiter.Limit())
g.GET("/api", myHandler)
源码实现
Allow
func (l *BBR) Allow(ctx context.Context, opts ...limit.AllowOption) (func(info limit.DoneInfo), error) {
allowOpts := limit.DefaultAllowOpts()
for _, opt := range opts {
opt.Apply(&allowOpts)
}
if l.shouldDrop() { // 判断是否触发限流
return nil, ecode.LimitExceed
}
atomic.AddInt64(&l.inFlight, 1) // 增加正在处理请求数
stime := time.Since(initTime) // 记录请求到来的时间
return func(do limit.DoneInfo) {
rt := int64((time.Since(initTime) - stime) / time.Millisecond) // 请求处理成功的响应时长
l.rtStat.Add(rt) // 增加rtStat响应耗时的统计
atomic.AddInt64(&l.inFlight, -1) // 请求处理成功后, 减少正在处理的请求数
switch do.Op {
case limit.Success:
l.passStat.Add(1) // 处理成功后增加成功处理请求数的统计
return
default:
return
}
}, nil
}
shouldDrop
func (l *BBR) shouldDrop() bool {
// 判断目前cpu的使用率是否达到设置的CPU的限制, 默认值800
if l.cpu() < l.conf.CPUThreshold {
// 如果上一次舍弃请求的时间是0, 那么说明没有限流的需求, 直接返回
prevDrop, _ := l.prevDrop.Load().(time.Duration)
if prevDrop == 0 {
return false
}
// 如果上一次请求的时间与当前的请求时间小于1s, 那么说明有限流的需求
if time.Since(initTime)-prevDrop <= time.Second {
if atomic.LoadInt32(&l.prevDropHit) == 0 {
atomic.StoreInt32(&l.prevDropHit, 1)
}
// 增加正在处理的请求的数量
inFlight := atomic.LoadInt64(&l.inFlight)
// 判断正在处理的请求数是否达到系统的最大的请求数量
return inFlight > 1 && inFlight > l.maxFlight()
}
// 清空当前的prevDrop
l.prevDrop.Store(time.Duration(0))
return false
}
// 增加正在处理的请求的数量
inFlight := atomic.LoadInt64(&l.inFlight)
// 判断正在处理的请求数是否达到系统的最大的请求数量
drop := inFlight > 1 && inFlight > l.maxFlight()
if drop {
prevDrop, _ := l.prevDrop.Load().(time.Duration)
// 如果判断达到了最大请求数量, 并且当前有限流需求
if prevDrop != 0 {
return drop
}
l.prevDrop.Store(time.Since(initTime))
}
return drop
}
maxFlight
该函数是核心函数. 其计算公式: MaxPass MinRt windows / 1000. maxPASS/minRT都是基于metric.RollingCounter
来实现的, 限于篇幅原因这里就不再具体看其实现(想看的可以去看rolling_counter_test.go还是蛮容易理解的)
func (l *BBR) maxFlight() int64 {
return int64(math.Floor(float64(l.maxPASS()*l.minRT()*l.winBucketPerSec)/1000.0 + 0.5))
}
winBucketPerSec: 每秒内的采样数量,其计算方式:int64(time.Second)/(int64(conf.Window)/int64(conf.WinBucket)), conf.Window默认值10s, conf.WinBucket默认值100. 简化下公式: 1/(10/100) = 10, 所以每秒内的采样数就是10
其他方法
// 单个采样窗口在一个采样周期中的最大的请求数, 默认的采样窗口是10s, 采样bucket数量100
func (l *BBR) maxPASS() int64 {
rawMaxPass := atomic.LoadInt64(&l.rawMaxPASS)
if rawMaxPass > 0 && l.passStat.Timespan() < 1 {
return rawMaxPass
}
// 遍历100个采样bucket, 找到采样bucket中最大的请求数
rawMaxPass = int64(l.passStat.Reduce(func(iterator metric.Iterator) float64 {
var result = 1.0
for i := 1; iterator.Next() && i < l.conf.WinBucket; i++ {
bucket := iterator.Bucket()
count := 0.0
for _, p := range bucket.Points {
count += p
}
result = math.Max(result, count)
}
return result
}))
if rawMaxPass == 0 {
rawMaxPass = 1
}
atomic.StoreInt64(&l.rawMaxPASS, rawMaxPass)
return rawMaxPass
}
// 单个采样窗口中最小的响应时间
func (l *BBR) minRT() int64 {
rawMinRT := atomic.LoadInt64(&l.rawMinRt)
if rawMinRT > 0 && l.rtStat.Timespan() < 1 {
return rawMinRT
}
// 遍历100个采样bucket, 找到采样bucket中最小的响应时间
rawMinRT = int64(math.Ceil(l.rtStat.Reduce(func(iterator metric.Iterator) float64 {
var result = math.MaxFloat64
for i := 1; iterator.Next() && i < l.conf.WinBucket; i++ {
bucket := iterator.Bucket()
if len(bucket.Points) == 0 {
continue
}
total := 0.0
for _, p := range bucket.Points {
total += p
}
avg := total / float64(bucket.Count)
result = math.Min(result, avg)
}
return result
})))
if rawMinRT <= 0 {
rawMinRT = 1
}
atomic.StoreInt64(&l.rawMinRt, rawMinRT)
return rawMinRT
}