上篇文章提到固定时间窗口限流无法处理突然请求洪峰情况,本文讲述的令牌桶线路算法则可以比较好的处理此场景。
工作原理
- 单位时间按照一定速率匀速的生产token放入桶内,直到达到桶容量上限。
- 处理请求,每次尝试获取一个或多个令牌,如果拿到则处理请求,失败则拒绝请求。

优缺点
优点
可以有效处理瞬间的突发流量,桶内存量token即可作为流量缓冲区平滑处理突发流量。
缺点
实现较为复杂
代码实现
core/limit/tokenlimit.go
分布式环境下考虑使用redis作为桶和令牌的存储容器,采用lua脚本实现整个算法流程。
redis lua脚本
--每秒生成token数量即token生成速度local rate = tonumber(ARGV[1])--桶容量local capacity = tonumber(ARGV[2])--当前时间戳local now = tonumber(ARGV[3])--当前请求token数量local requested = tonumber(ARGV[4])--需要多少秒才能填满桶local fill_time = capacity/rate--向下取整,ttl为填满时间的2倍local ttl = math.floor(fill_time*2)--当前时间桶容量local last_tokens = tonumber(redis.call("get", KEYS[1]))--如果当前桶容量为0,说明是第一次进入,则默认容量为桶的最大容量if last_tokens == nil thenlast_tokens = capacityend--上一次刷新的时间local last_refreshed = tonumber(redis.call("get", KEYS[2]))--第一次进入则设置刷新时间为0if last_refreshed == nil thenlast_refreshed = 0end--距离上次请求的时间跨度local delta = math.max(0, now-last_refreshed)--距离上次请求的时间跨度,总共能生产token的数量,如果超多最大容量则丢弃多余的tokenlocal filled_tokens = math.min(capacity, last_tokens+(delta*rate))--本次请求token数量是否足够local allowed = filled_tokens >= requested--桶剩余数量local new_tokens = filled_tokens--允许本次token申请,计算剩余数量if allowed thennew_tokens = filled_tokens - requestedend--设置剩余token数量redis.call("setex", KEYS[1], ttl, new_tokens)--设置刷新时间redis.call("setex", KEYS[2], ttl, now)return allowed
令牌桶限流器定义
type TokenLimiter struct {//每秒生产速率rate int//桶容量burst int//存储容器store *redis.Redis//redis keytokenKey string//桶刷新时间keytimestampKey string//lockrescueLock sync.Mutex//redis健康标识redisAlive uint32//redis故障时采用进程内 令牌桶限流器rescueLimiter *xrate.Limiter//redis监控探测任务标识monitorStarted bool}func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter {tokenKey := fmt.Sprintf(tokenFormat, key)timestampKey := fmt.Sprintf(timestampFormat, key)return &TokenLimiter{rate: rate,burst: burst,store: store,tokenKey: tokenKey,timestampKey: timestampKey,redisAlive: 1,rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst),}}
获取令牌

func (lim *TokenLimiter) reserveN(now time.Time, n int) bool {//判断redis是否健康//redis故障时采用进程内限流器//兜底保障if atomic.LoadUint32(&lim.redisAlive) == 0 {return lim.rescueLimiter.AllowN(now, n)}//执行脚本获取令牌resp, err := lim.store.Eval(script,[]string{lim.tokenKey,lim.timestampKey,},[]string{strconv.Itoa(lim.rate),strconv.Itoa(lim.burst),strconv.FormatInt(now.Unix(), 10),strconv.Itoa(n),})// redis allowed == false// Lua boolean false -> r Nil bulk reply//特殊处理key不存在的情况if err == redis.Nil {return false} else if err != nil {logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)//执行异常,开启redis健康探测任务//同时采用进程内限流器作为兜底lim.startMonitor()return lim.rescueLimiter.AllowN(now, n)}code, ok := resp.(int64)if !ok {logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp)lim.startMonitor()return lim.rescueLimiter.AllowN(now, n)}// redis allowed == true// Lua boolean true -> r integer reply with value of 1return code == 1}
redis故障时兜底策略
兜底策略的设计考虑得非常细节
//开启redis健康探测func (lim *TokenLimiter) startMonitor() {lim.rescueLock.Lock()defer lim.rescueLock.Unlock()//防止重复开启if lim.monitorStarted {return}//设置任务和健康标识lim.monitorStarted = trueatomic.StoreUint32(&lim.redisAlive, 0)//健康探测go lim.waitForRedis()}//redis健康探测定时任务func (lim *TokenLimiter) waitForRedis() {ticker := time.NewTicker(pingInterval)//健康探测成功时回调此函数defer func() {ticker.Stop()lim.rescueLock.Lock()lim.monitorStarted = falselim.rescueLock.Unlock()}()for range ticker.C {//ping属于redis内置健康探测命令if lim.store.Ping() {//健康探测成功,设置健康标识atomic.StoreUint32(&lim.redisAlive, 1)return}}}
