不管是在单体服务中还是在微服务中,开发者为前端提供的API接口都是有访问上限的,当访问频率或者并发量超过其承受范围时候,我们就必须考虑限流来保证接口的可用性或者降级可用性。即接口也需要安装上保险丝,以防止非预期的请求对系统压力过大而引起的系统瘫痪。

本文就来介绍一下 periodlimit

使用

  1. const (
  2. seconds = 1
  3. total = 100
  4. quota = 5
  5. )
  6. // New limiter
  7. l := NewPeriodLimit(seconds, quota, redis.NewRedis(s.Addr(), redis.NodeType), "periodlimit")
  8. // take source
  9. code, err := l.Take("first")
  10. if err != nil {
  11. logx.Error(err)
  12. return true
  13. }
  14. // switch val => process request
  15. switch code {
  16. case limit.OverQuota:
  17. logx.Errorf("OverQuota key: %v", key)
  18. return false
  19. case limit.Allowed:
  20. logx.Infof("AllowedQuota key: %v", key)
  21. return true
  22. case limit.HitQuota:
  23. logx.Errorf("HitQuota key: %v", key)
  24. // todo: maybe we need to let users know they hit the quota
  25. return false
  26. default:
  27. logx.Errorf("DefaultQuota key: %v", key)
  28. // unknown response, we just let the sms go
  29. return true
  30. }


periodlimit

go-zero 采取 滑动窗口 计数的方式,计算一段时间内对同一个资源的访问次数,如果超过指定的 limit ,则拒绝访问。当然如果你是在一段时间内访问不同的资源,每一个资源访问量都不超过 limit ,此种情况是允许大量请求进来的。

而在一个分布式系统中,存在多个微服务提供服务。所以当瞬间的流量同时访问同一个资源,如何让计数器在分布式系统中正常计数? 同时在计算资源访问时,可能会涉及多个计算,如何保证计算的原子性?

  • go-zero 借助 redis incrby 做资源访问计数
  • 采用 lua script 做整个窗口计算,保证计算的原子性

下面来看看 lua script 控制的几个关键属性:

argument mean
key[1] 访问资源的标示
ARGV[1] limit => 请求总数,超过则限速。可设置为 QPS
ARGV[2] window大小 => 滑动窗口,用 ttl 模拟出滑动的效果
  1. -- to be compatible with aliyun redis,
  2. -- we cannot use `local key = KEYS[1]` to reuse thekey
  3. local limit = tonumber(ARGV[1])
  4. local window = tonumber(ARGV[2])
  5. -- incrbt key 1 => key visis++
  6. local current = redis.call("INCRBY", KEYS[1], 1)
  7. -- 如果是第一次访问,设置过期时间 => TTL = window size
  8. -- 因为是只限制一段时间的访问次数
  9. if current == 1 then
  10. redis.call("expire", KEYS[1], window)
  11. return 1
  12. elseif current < limit then
  13. return 1
  14. elseif current == limit then
  15. return 2
  16. else
  17. return 0
  18. end

至于上述的 return code ,返回给调用方。由调用方来决定请求后续的操作:

return code tag call code mean
0 OverQuota 3 over limit
1 Allowed 1 in limit
2 HitQuota 2 hit limit

下面这张图描述了请求进入的过程,以及请求触发 limit 时后续发生的情况: image.png image.png


后续处理

如果在服务某个时间点,请求大批量打进来,periodlimit 短期时间内达到 limit 阈值,而且设置的时间范围还远远没有到达。后续请求的处理就成为问题。

periodlimit 中并没有处理,而是返回 code 。把后续请求的处理交给了开发者自己处理。

  1. 如果不做处理,那就是简单的将请求拒绝
  2. 如果需要处理这些请求,开发者可以借助 mq 将请求缓冲,减缓请求的压力
  3. 采用 tokenlimit,允许暂时的流量冲击

所以下一篇我们就来聊聊 tokenlimit


总结

go-zero 中的 periodlimit 限流方案是基于 redis 计数器,通过调用 redis lua script ,保证计数过程的原子性,同时保证在分布式的情况下计数是正常的。但是这种方案存在缺点,因为它要记录时间窗口内的所有行为记录,如果这个量特别大的时候,内存消耗会变得非常严重。

参考

示例

  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "log"
  6. "runtime"
  7. "strconv"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/tal-tech/go-zero/core/limit"
  12. "github.com/tal-tech/go-zero/core/stores/redis"
  13. )
  14. const seconds = 5
  15. var (
  16. rdx = flag.String("redis", "localhost:6379", "the redis, default localhost:6379")
  17. rdxType = flag.String("redisType", "node", "the redis type, default node")
  18. rdxPass = flag.String("redisPass", "", "the redis password")
  19. rdxKey = flag.String("redisKey", "rate", "the redis key, default rate")
  20. threads = flag.Int("threads", runtime.NumCPU(), "the concurrent threads, default to cores")
  21. )
  22. func main() {
  23. flag.Parse()
  24. result := fmt.Sprintf("rdx:%s,redisType:%s,redisPass:%s,redisKey:%s ", *rdx, *rdxType, *rdxPass, *rdxKey)
  25. fmt.Println(result)
  26. store := redis.NewRedis(*rdx, *rdxType, *rdxPass)
  27. fmt.Println(store.Ping())
  28. lmt := limit.NewPeriodLimit(seconds, 5, store, *rdxKey)
  29. timer := time.NewTimer(time.Second * seconds)
  30. quit := make(chan struct{})
  31. defer timer.Stop()
  32. go func() {
  33. <-timer.C
  34. close(quit)
  35. }()
  36. var allowed, denied int32
  37. var wait sync.WaitGroup
  38. for i := 0; i < *threads; i++ {
  39. i := i
  40. wait.Add(1)
  41. go func() {
  42. for {
  43. select {
  44. case <-quit:
  45. wait.Done()
  46. return
  47. default:
  48. if v, err := lmt.Take(strconv.FormatInt(int64(i), 10)); err == nil && v == limit.Allowed {
  49. atomic.AddInt32(&allowed, 1)
  50. } else if err != nil {
  51. log.Fatal(err)
  52. } else {
  53. atomic.AddInt32(&denied, 1)
  54. }
  55. }
  56. }
  57. }()
  58. }
  59. wait.Wait()
  60. fmt.Printf("allowed: %d, denied: %d, qps: %d\n", allowed, denied, (allowed+denied)/seconds)
  61. }
  1. PS F:\Projects\NoobWu\zero-examples\limit\period> go run .\periodlimit.go -redis="127.0.0.1:6379" -redisPass="123456" -redisKey="gozero:rate"

image.png
image.png

原文链接

https://go-zero.dev/cn/periodlimit.html