不管是在单体服务中还是在微服务中,开发者为前端提供的API接口都是有访问上限的,当访问频率或者并发量超过其承受范围时候,我们就必须考虑限流来保证接口的可用性或者降级可用性。即接口也需要安装上保险丝,以防止非预期的请求对系统压力过大而引起的系统瘫痪。
本文就来介绍一下 periodlimit 。
使用
const (seconds = 1total = 100quota = 5)// New limiterl := NewPeriodLimit(seconds, quota, redis.NewRedis(s.Addr(), redis.NodeType), "periodlimit")// take sourcecode, err := l.Take("first")if err != nil {logx.Error(err)return true}// switch val => process requestswitch code {case limit.OverQuota:logx.Errorf("OverQuota key: %v", key)return falsecase limit.Allowed:logx.Infof("AllowedQuota key: %v", key)return truecase limit.HitQuota:logx.Errorf("HitQuota key: %v", key)// todo: maybe we need to let users know they hit the quotareturn falsedefault:logx.Errorf("DefaultQuota key: %v", key)// unknown response, we just let the sms goreturn true}
periodlimit
go-zero 采取 滑动窗口 计数的方式,计算一段时间内对同一个资源的访问次数,如果超过指定的 limit ,则拒绝访问。当然如果你是在一段时间内访问不同的资源,每一个资源访问量都不超过 limit ,此种情况是允许大量请求进来的。
而在一个分布式系统中,存在多个微服务提供服务。所以当瞬间的流量同时访问同一个资源,如何让计数器在分布式系统中正常计数? 同时在计算资源访问时,可能会涉及多个计算,如何保证计算的原子性?
- go-zero 借助 redis 的 incrby 做资源访问计数
 - 采用 lua script 做整个窗口计算,保证计算的原子性
 
下面来看看 lua script 控制的几个关键属性:
| argument | mean | 
|---|---|
| key[1] | 访问资源的标示 | 
| ARGV[1] | limit => 请求总数,超过则限速。可设置为 QPS | 
| ARGV[2] | window大小 => 滑动窗口,用 ttl 模拟出滑动的效果 | 
-- to be compatible with aliyun redis,-- we cannot use `local key = KEYS[1]` to reuse thekeylocal limit = tonumber(ARGV[1])local window = tonumber(ARGV[2])-- incrbt key 1 => key visis++local current = redis.call("INCRBY", KEYS[1], 1)-- 如果是第一次访问,设置过期时间 => TTL = window size-- 因为是只限制一段时间的访问次数if current == 1 thenredis.call("expire", KEYS[1], window)return 1elseif current < limit thenreturn 1elseif current == limit thenreturn 2elsereturn 0end
至于上述的 return code ,返回给调用方。由调用方来决定请求后续的操作:
| return code | tag | call code | mean | 
|---|---|---|---|
| 0 | OverQuota | 3 | over limit | 
| 1 | Allowed | 1 | in limit | 
| 2 | HitQuota | 2 | hit limit | 
下面这张图描述了请求进入的过程,以及请求触发 limit 时后续发生的情况: 
 
后续处理
如果在服务某个时间点,请求大批量打进来,periodlimit 短期时间内达到 limit 阈值,而且设置的时间范围还远远没有到达。后续请求的处理就成为问题。
periodlimit 中并没有处理,而是返回 code 。把后续请求的处理交给了开发者自己处理。
- 如果不做处理,那就是简单的将请求拒绝
 - 如果需要处理这些请求,开发者可以借助 mq 将请求缓冲,减缓请求的压力
 - 采用 tokenlimit,允许暂时的流量冲击
 
总结
go-zero 中的 periodlimit 限流方案是基于 redis 计数器,通过调用 redis lua script ,保证计数过程的原子性,同时保证在分布式的情况下计数是正常的。但是这种方案存在缺点,因为它要记录时间窗口内的所有行为记录,如果这个量特别大的时候,内存消耗会变得非常严重。
参考
示例
package mainimport ("flag""fmt""log""runtime""strconv""sync""sync/atomic""time""github.com/tal-tech/go-zero/core/limit""github.com/tal-tech/go-zero/core/stores/redis")const seconds = 5var (rdx = flag.String("redis", "localhost:6379", "the redis, default localhost:6379")rdxType = flag.String("redisType", "node", "the redis type, default node")rdxPass = flag.String("redisPass", "", "the redis password")rdxKey = flag.String("redisKey", "rate", "the redis key, default rate")threads = flag.Int("threads", runtime.NumCPU(), "the concurrent threads, default to cores"))func main() {flag.Parse()result := fmt.Sprintf("rdx:%s,redisType:%s,redisPass:%s,redisKey:%s ", *rdx, *rdxType, *rdxPass, *rdxKey)fmt.Println(result)store := redis.NewRedis(*rdx, *rdxType, *rdxPass)fmt.Println(store.Ping())lmt := limit.NewPeriodLimit(seconds, 5, store, *rdxKey)timer := time.NewTimer(time.Second * seconds)quit := make(chan struct{})defer timer.Stop()go func() {<-timer.Cclose(quit)}()var allowed, denied int32var wait sync.WaitGroupfor i := 0; i < *threads; i++ {i := iwait.Add(1)go func() {for {select {case <-quit:wait.Done()returndefault:if v, err := lmt.Take(strconv.FormatInt(int64(i), 10)); err == nil && v == limit.Allowed {atomic.AddInt32(&allowed, 1)} else if err != nil {log.Fatal(err)} else {atomic.AddInt32(&denied, 1)}}}}()}wait.Wait()fmt.Printf("allowed: %d, denied: %d, qps: %d\n", allowed, denied, (allowed+denied)/seconds)}
PS F:\Projects\NoobWu\zero-examples\limit\period> go run .\periodlimit.go -redis="127.0.0.1:6379" -redisPass="123456" -redisKey="gozero:rate"


