Limit
type Limit float64 // 每秒可处理的事件数量
- Inf
const Inf = Limit(math.MaxFloat64)
- 根据限流间隔计算 Limit
func Every(interval time.Duration) Limit { // interval 事件处理间隔
if interval <= 0 {
return Inf
}
return 1 / Limit(interval.Seconds())
}
- 生成 tokens 个令牌,需要的时间
func (limit Limit) durationFromTokens(tokens float64) time.Duration {
seconds := tokens / float64(limit)
return time.Nanosecond * time.Duration(1e9*seconds)
}
- 时间段内,需要增发的 token 数量
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
return d.Seconds() * float64(limit)
}
Reservation
type Reservation struct {
ok bool // 是否成功获得需要的 token
lim *Limiter
tokens int // token 数量
timeToAct time.Time
limit Limit // Reservation 创建时,Limiter 的限流速率
}
DelayFrom: 使用前,需要等待的时间
func (r *Reservation) DelayFrom(now time.Time) time.Duration {
if !r.ok {
return InfDuration
}
delay := r.timeToAct.Sub(now)
if delay < 0 {
return 0 // 可立即使用
}
return delay
}
CancelAt: 归还占用的 token
func (r *Reservation) CancelAt(now time.Time) {
if !r.ok { // 预留没有成功,不需要归还
return
}
r.lim.mu.Lock()
defer r.lim.mu.Unlock()
if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {
// 无限流 或 没有占用 token 或 token 已被消耗时,不需要归还
return
}
// 计算需要归还的 token 数量
// lastEvent 与 timeToAct 时间差,代表着预留的 token 数量,不需要归还;否则会在 advance 中重复计算
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
if restoreTokens <= 0 {
return
}
now, _, tokens := r.lim.advance(now)
// 计算总 token 数量
tokens += restoreTokens
if burst := float64(r.lim.burst); tokens > burst {
tokens = burst
}
// 更新状态
r.lim.last = now
r.lim.tokens = tokens
if r.timeToAct == r.lim.lastEvent {
// 还原 lastEvent
prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
if !prevEvent.Before(now) {
r.lim.lastEvent = prevEvent
}
}
return
}
Limiter
type Limiter struct {
limit Limit // 每秒允许的事件处理数量
burst int // 一次请求,最多消耗的令牌数量
mu sync.Mutex
tokens float64 // 令牌数量
last time.Time // tokens 域最后更新时间
lastEvent time.Time // rate-limit 事件发生的最近时间
}
advance: 最多补偿 burst 个可用 token
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
last := lim.last
if now.Before(last) { // 调整 last 时间
last = now
}
// 增加不超过 burst 个可用 token,需要的时间
// lim.tokens 可能为负值,意味着欠其他调用者的 token 数量,需要先行补足
maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
elapsed := now.Sub(last)
if elapsed > maxElapsed { // 补偿时段计算
elapsed = maxElapsed
}
// 计算增补 token 的数量
delta := lim.limit.tokensFromDuration(elapsed)
// 计算增补后,可用 token 数量;如果超过 burst,调整为 burst,防止后续调用超出限制
tokens := lim.tokens + delta
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
return now, last, tokens
}
reserveN
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock() // 锁保护
if lim.limit == Inf { // 无限流,直接返回
lim.mu.Unlock()
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
}
// 考虑到上次调用时间与当前时间间隔,调整 token 数量,最多为 burst 个
now, last, tokens := lim.advance(now)
// 扣除需要获取的 token 数量
tokens -= float64(n)
var waitDuration time.Duration
if tokens < 0 { // token 数量不足,计算需要等待的时间
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// 设置预留结果
// 请求 token 数量没有超过单次请求最大值
// 等待时间没有超过请求者预期
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// 生成 Reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
// 成功预留
if ok {
r.tokens = n
// Reservation 可用时间更新;意味着,后续请求预留至少要在 r.timeToAct 后,才能获取
r.timeToAct = now.Add(waitDuration)
}
if ok { // 预留成功
lim.last = now
lim.tokens = tokens // 剩余 token 数量,可能为负值
lim.lastEvent = r.timeToAct
} else {
lim.last = last // 预留失败,不更新 token 总数
}
lim.mu.Unlock()
return r
}
Allow
func (lim *Limiter) Allow() bool {
return lim.AllowN(time.Now(), 1) // 获取一个 token
}
AllowN
func (lim *Limiter) AllowN(now time.Time, n int) bool {
return lim.reserveN(now, n, 0).ok // 获取 n 个 token,如果没有,不预留
}
WaitN
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
if n > lim.burst && lim.limit != Inf { // 容错检查
return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst)
}
// 检查 ctx 是否已被取消
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// 计算最长等待时间
now := time.Now()
waitLimit := InfDuration
if deadline, ok := ctx.Deadline(); ok {
waitLimit = deadline.Sub(now)
}
// 预留 token
r := lim.reserveN(now, n, waitLimit)
if !r.ok {
return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
}
// 预留成功时,意味着等待 n 个 token 满足的时间,不会超过 ctx 的超时时间;后续不需要再检查
t := time.NewTimer(r.DelayFrom(now))
defer t.Stop()
select {
case <-t.C:
// 预留成功,n 个 token 已满足
return nil
case <-ctx.Done():
r.Cancel() // ctx 被取消,归还 token
return ctx.Err()
}
}
SetLimitAt
补偿 token,并调整新的限流速率。
func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) {
lim.mu.Lock()
defer lim.mu.Unlock()
now, _, tokens := lim.advance(now) // 计算需要补偿的 token 数量
lim.last = now
lim.tokens = tokens
lim.limit = newLimit
}