限流又称为流量控制(流控),通常是指限制到达系统的并发请求数,常用的限流算法主要有漏洞和令牌桶。
漏桶
漏桶法限流很好理解,假设我们有一个水桶按固定的速率向下方滴落一滴水,无论有多少请求,请求的速率有多大,都按照固定的速率流出,对应到系统中就是按照固定的速率处理请求。
漏桶法的关键点在于漏桶始终按照固定的速率运行,但是它并不能很好的处理有大量突发请求的场景,毕竟在某些场景下我们可能需要提高系统的处理效率,而不是一味的按照固定速率处理请求。
关于漏桶的实现,uber团队有一个开源的github.com/uber-go/ratelimit库。 这个库的使用方法比较简单,Take() 方法会返回漏桶下一次滴水的时间。
import ("fmt""time""go.uber.org/ratelimit")func main() {rl := ratelimit.New(100) // per secondprev := time.Now()for i := 0; i < 10; i++ {now := rl.Take()fmt.Println(i, now.Sub(prev))prev = now}// Output:// 0 0// 1 10ms// 2 10ms// 3 10ms// 4 10ms// 5 10ms// 6 10ms// 7 10ms// 8 10ms// 9 10ms}
它的源码实现也比较简单,这里大致说一下关键的地方,有兴趣的同学可以自己去看一下完整的源码。
限制器是一个接口类型,其要求实现一个Take()方法:
type Limiter interface {// Take方法应该阻塞已确保满足 RPSTake() time.Time}
实现限制器接口的结构体定义如下,这里可以重点留意下maxSlack字段,它在后面的Take()方法中的处理。
type limiter struct {sync.Mutex // 锁last time.Time // 上一次的时刻sleepFor time.Duration // 需要等待的时间perRequest time.Duration // 每次的时间间隔maxSlack time.Duration // 最大的富余量clock Clock // 时钟}
limiter结构体实现Limiter接口的Take()方法内容如下:
// Take 会阻塞确保两次请求之间的时间走完// Take 调用平均数为 time.Second/rate.func (t *limiter) Take() time.Time {t.Lock()defer t.Unlock()now := t.clock.Now()// 如果是第一次请求就直接放行if t.last.IsZero() {t.last = nowreturn t.last}// sleepFor 根据 perRequest 和上一次请求的时刻计算应该sleep的时间// 由于每次请求间隔的时间可能会超过perRequest, 所以这个数字可能为负数,并在多个请求之间累加t.sleepFor += t.perRequest - now.Sub(t.last)// 我们不应该让sleepFor负的太多,因为这意味着一个服务在短时间内慢了很多随后会得到更高的RPS。if t.sleepFor < t.maxSlack {t.sleepFor = t.maxSlack}// 如果 sleepFor 是正值那么就 sleepif t.sleepFor > 0 {t.clock.Sleep(t.sleepFor)t.last = now.Add(t.sleepFor)t.sleepFor = 0} else {t.last = now}return t.last}
上面的代码根据记录每次请求的间隔时间和上一次请求的时刻来计算当次请求需要阻塞的时间——sleepFor,这里需要留意的是sleepFor的值可能为负,在经过间隔时间长的两次访问之后会导致随后大量的请求被放行,所以代码中针对这个场景有专门的优化处理。创建限制器的New()函数中会为maxSlack设置初始值,也可以通过WithoutSlack这个Option取消这个默认值。
func New(rate int, opts ...Option) Limiter {l := &limiter{perRequest: time.Second / time.Duration(rate),maxSlack: -10 * time.Second / time.Duration(rate),}for _, opt := range opts {opt(l)}if l.clock == nil {l.clock = clock.New()}return l}
