Limit

  1. type Limit float64 // 每秒可处理的事件数量
  • Inf
  1. const Inf = Limit(math.MaxFloat64)
  • 根据限流间隔计算 Limit
  1. func Every(interval time.Duration) Limit { // interval 事件处理间隔
  2. if interval <= 0 {
  3. return Inf
  4. }
  5. return 1 / Limit(interval.Seconds())
  6. }
  • 生成 tokens 个令牌,需要的时间
  1. func (limit Limit) durationFromTokens(tokens float64) time.Duration {
  2. seconds := tokens / float64(limit)
  3. return time.Nanosecond * time.Duration(1e9*seconds)
  4. }
  • 时间段内,需要增发的 token 数量
  1. func (limit Limit) tokensFromDuration(d time.Duration) float64 {
  2. return d.Seconds() * float64(limit)
  3. }

Reservation

  1. type Reservation struct {
  2. ok bool // 是否成功获得需要的 token
  3. lim *Limiter
  4. tokens int // token 数量
  5. timeToAct time.Time
  6. limit Limit // Reservation 创建时,Limiter 的限流速率
  7. }

DelayFrom: 使用前,需要等待的时间

  1. func (r *Reservation) DelayFrom(now time.Time) time.Duration {
  2. if !r.ok {
  3. return InfDuration
  4. }
  5. delay := r.timeToAct.Sub(now)
  6. if delay < 0 {
  7. return 0 // 可立即使用
  8. }
  9. return delay
  10. }

CancelAt: 归还占用的 token

  1. func (r *Reservation) CancelAt(now time.Time) {
  2. if !r.ok { // 预留没有成功,不需要归还
  3. return
  4. }
  5. r.lim.mu.Lock()
  6. defer r.lim.mu.Unlock()
  7. if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {
  8. // 无限流 或 没有占用 token 或 token 已被消耗时,不需要归还
  9. return
  10. }
  11. // 计算需要归还的 token 数量
  12. // lastEvent 与 timeToAct 时间差,代表着预留的 token 数量,不需要归还;否则会在 advance 中重复计算
  13. restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
  14. if restoreTokens <= 0 {
  15. return
  16. }
  17. now, _, tokens := r.lim.advance(now)
  18. // 计算总 token 数量
  19. tokens += restoreTokens
  20. if burst := float64(r.lim.burst); tokens > burst {
  21. tokens = burst
  22. }
  23. // 更新状态
  24. r.lim.last = now
  25. r.lim.tokens = tokens
  26. if r.timeToAct == r.lim.lastEvent {
  27. // 还原 lastEvent
  28. prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
  29. if !prevEvent.Before(now) {
  30. r.lim.lastEvent = prevEvent
  31. }
  32. }
  33. return
  34. }

Limiter

  1. type Limiter struct {
  2. limit Limit // 每秒允许的事件处理数量
  3. burst int // 一次请求,最多消耗的令牌数量
  4. mu sync.Mutex
  5. tokens float64 // 令牌数量
  6. last time.Time // tokens 域最后更新时间
  7. lastEvent time.Time // rate-limit 事件发生的最近时间
  8. }

advance: 最多补偿 burst 个可用 token

  1. func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
  2. last := lim.last
  3. if now.Before(last) { // 调整 last 时间
  4. last = now
  5. }
  6. // 增加不超过 burst 个可用 token,需要的时间
  7. // lim.tokens 可能为负值,意味着欠其他调用者的 token 数量,需要先行补足
  8. maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
  9. elapsed := now.Sub(last)
  10. if elapsed > maxElapsed { // 补偿时段计算
  11. elapsed = maxElapsed
  12. }
  13. // 计算增补 token 的数量
  14. delta := lim.limit.tokensFromDuration(elapsed)
  15. // 计算增补后,可用 token 数量;如果超过 burst,调整为 burst,防止后续调用超出限制
  16. tokens := lim.tokens + delta
  17. if burst := float64(lim.burst); tokens > burst {
  18. tokens = burst
  19. }
  20. return now, last, tokens
  21. }

reserveN

  1. func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
  2. lim.mu.Lock() // 锁保护
  3. if lim.limit == Inf { // 无限流,直接返回
  4. lim.mu.Unlock()
  5. return Reservation{
  6. ok: true,
  7. lim: lim,
  8. tokens: n,
  9. timeToAct: now,
  10. }
  11. }
  12. // 考虑到上次调用时间与当前时间间隔,调整 token 数量,最多为 burst 个
  13. now, last, tokens := lim.advance(now)
  14. // 扣除需要获取的 token 数量
  15. tokens -= float64(n)
  16. var waitDuration time.Duration
  17. if tokens < 0 { // token 数量不足,计算需要等待的时间
  18. waitDuration = lim.limit.durationFromTokens(-tokens)
  19. }
  20. // 设置预留结果
  21. // 请求 token 数量没有超过单次请求最大值
  22. // 等待时间没有超过请求者预期
  23. ok := n <= lim.burst && waitDuration <= maxFutureReserve
  24. // 生成 Reservation
  25. r := Reservation{
  26. ok: ok,
  27. lim: lim,
  28. limit: lim.limit,
  29. }
  30. // 成功预留
  31. if ok {
  32. r.tokens = n
  33. // Reservation 可用时间更新;意味着,后续请求预留至少要在 r.timeToAct 后,才能获取
  34. r.timeToAct = now.Add(waitDuration)
  35. }
  36. if ok { // 预留成功
  37. lim.last = now
  38. lim.tokens = tokens // 剩余 token 数量,可能为负值
  39. lim.lastEvent = r.timeToAct
  40. } else {
  41. lim.last = last // 预留失败,不更新 token 总数
  42. }
  43. lim.mu.Unlock()
  44. return r
  45. }

Allow

  1. func (lim *Limiter) Allow() bool {
  2. return lim.AllowN(time.Now(), 1) // 获取一个 token
  3. }

AllowN

  1. func (lim *Limiter) AllowN(now time.Time, n int) bool {
  2. return lim.reserveN(now, n, 0).ok // 获取 n 个 token,如果没有,不预留
  3. }

WaitN

  1. func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
  2. if n > lim.burst && lim.limit != Inf { // 容错检查
  3. return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst)
  4. }
  5. // 检查 ctx 是否已被取消
  6. select {
  7. case <-ctx.Done():
  8. return ctx.Err()
  9. default:
  10. }
  11. // 计算最长等待时间
  12. now := time.Now()
  13. waitLimit := InfDuration
  14. if deadline, ok := ctx.Deadline(); ok {
  15. waitLimit = deadline.Sub(now)
  16. }
  17. // 预留 token
  18. r := lim.reserveN(now, n, waitLimit)
  19. if !r.ok {
  20. return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
  21. }
  22. // 预留成功时,意味着等待 n 个 token 满足的时间,不会超过 ctx 的超时时间;后续不需要再检查
  23. t := time.NewTimer(r.DelayFrom(now))
  24. defer t.Stop()
  25. select {
  26. case <-t.C:
  27. // 预留成功,n 个 token 已满足
  28. return nil
  29. case <-ctx.Done():
  30. r.Cancel() // ctx 被取消,归还 token
  31. return ctx.Err()
  32. }
  33. }

SetLimitAt

补偿 token,并调整新的限流速率。

  1. func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) {
  2. lim.mu.Lock()
  3. defer lim.mu.Unlock()
  4. now, _, tokens := lim.advance(now) // 计算需要补偿的 token 数量
  5. lim.last = now
  6. lim.tokens = tokens
  7. lim.limit = newLimit
  8. }