Limit
type Limit float64 // 每秒可处理的事件数量
- Inf
const Inf = Limit(math.MaxFloat64)
- 根据限流间隔计算 Limit
func Every(interval time.Duration) Limit { // interval 事件处理间隔if interval <= 0 {return Inf}return 1 / Limit(interval.Seconds())}
- 生成 tokens 个令牌,需要的时间
func (limit Limit) durationFromTokens(tokens float64) time.Duration {seconds := tokens / float64(limit)return time.Nanosecond * time.Duration(1e9*seconds)}
- 时间段内,需要增发的 token 数量
func (limit Limit) tokensFromDuration(d time.Duration) float64 {return d.Seconds() * float64(limit)}
Reservation
type Reservation struct {ok bool // 是否成功获得需要的 tokenlim *Limitertokens int // token 数量timeToAct time.Timelimit Limit // Reservation 创建时,Limiter 的限流速率}
DelayFrom: 使用前,需要等待的时间
func (r *Reservation) DelayFrom(now time.Time) time.Duration {if !r.ok {return InfDuration}delay := r.timeToAct.Sub(now)if delay < 0 {return 0 // 可立即使用}return delay}
CancelAt: 归还占用的 token
func (r *Reservation) CancelAt(now time.Time) {if !r.ok { // 预留没有成功,不需要归还return}r.lim.mu.Lock()defer r.lim.mu.Unlock()if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {// 无限流 或 没有占用 token 或 token 已被消耗时,不需要归还return}// 计算需要归还的 token 数量// lastEvent 与 timeToAct 时间差,代表着预留的 token 数量,不需要归还;否则会在 advance 中重复计算restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))if restoreTokens <= 0 {return}now, _, tokens := r.lim.advance(now)// 计算总 token 数量tokens += restoreTokensif burst := float64(r.lim.burst); tokens > burst {tokens = burst}// 更新状态r.lim.last = nowr.lim.tokens = tokensif r.timeToAct == r.lim.lastEvent {// 还原 lastEventprevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))if !prevEvent.Before(now) {r.lim.lastEvent = prevEvent}}return}
Limiter
type Limiter struct {limit Limit // 每秒允许的事件处理数量burst int // 一次请求,最多消耗的令牌数量mu sync.Mutextokens float64 // 令牌数量last time.Time // tokens 域最后更新时间lastEvent time.Time // rate-limit 事件发生的最近时间}
advance: 最多补偿 burst 个可用 token
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {last := lim.lastif now.Before(last) { // 调整 last 时间last = now}// 增加不超过 burst 个可用 token,需要的时间// lim.tokens 可能为负值,意味着欠其他调用者的 token 数量,需要先行补足maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)elapsed := now.Sub(last)if elapsed > maxElapsed { // 补偿时段计算elapsed = maxElapsed}// 计算增补 token 的数量delta := lim.limit.tokensFromDuration(elapsed)// 计算增补后,可用 token 数量;如果超过 burst,调整为 burst,防止后续调用超出限制tokens := lim.tokens + deltaif burst := float64(lim.burst); tokens > burst {tokens = burst}return now, last, tokens}
reserveN
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {lim.mu.Lock() // 锁保护if lim.limit == Inf { // 无限流,直接返回lim.mu.Unlock()return Reservation{ok: true,lim: lim,tokens: n,timeToAct: now,}}// 考虑到上次调用时间与当前时间间隔,调整 token 数量,最多为 burst 个now, last, tokens := lim.advance(now)// 扣除需要获取的 token 数量tokens -= float64(n)var waitDuration time.Durationif tokens < 0 { // token 数量不足,计算需要等待的时间waitDuration = lim.limit.durationFromTokens(-tokens)}// 设置预留结果// 请求 token 数量没有超过单次请求最大值// 等待时间没有超过请求者预期ok := n <= lim.burst && waitDuration <= maxFutureReserve// 生成 Reservationr := Reservation{ok: ok,lim: lim,limit: lim.limit,}// 成功预留if ok {r.tokens = n// Reservation 可用时间更新;意味着,后续请求预留至少要在 r.timeToAct 后,才能获取r.timeToAct = now.Add(waitDuration)}if ok { // 预留成功lim.last = nowlim.tokens = tokens // 剩余 token 数量,可能为负值lim.lastEvent = r.timeToAct} else {lim.last = last // 预留失败,不更新 token 总数}lim.mu.Unlock()return r}
Allow
func (lim *Limiter) Allow() bool {return lim.AllowN(time.Now(), 1) // 获取一个 token}
AllowN
func (lim *Limiter) AllowN(now time.Time, n int) bool {return lim.reserveN(now, n, 0).ok // 获取 n 个 token,如果没有,不预留}
WaitN
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {if n > lim.burst && lim.limit != Inf { // 容错检查return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst)}// 检查 ctx 是否已被取消select {case <-ctx.Done():return ctx.Err()default:}// 计算最长等待时间now := time.Now()waitLimit := InfDurationif deadline, ok := ctx.Deadline(); ok {waitLimit = deadline.Sub(now)}// 预留 tokenr := lim.reserveN(now, n, waitLimit)if !r.ok {return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)}// 预留成功时,意味着等待 n 个 token 满足的时间,不会超过 ctx 的超时时间;后续不需要再检查t := time.NewTimer(r.DelayFrom(now))defer t.Stop()select {case <-t.C:// 预留成功,n 个 token 已满足return nilcase <-ctx.Done():r.Cancel() // ctx 被取消,归还 tokenreturn ctx.Err()}}
SetLimitAt
补偿 token,并调整新的限流速率。
func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) {lim.mu.Lock()defer lim.mu.Unlock()now, _, tokens := lim.advance(now) // 计算需要补偿的 token 数量lim.last = nowlim.tokens = tokenslim.limit = newLimit}
