简单限流

首先我们来看一个常见 的简单的限流策略。系统要限定用户的某个行为在指定的时间里只能允许发生 N 次,如何使用 Redis 的数据结构来实现这个限流的功能?
这个限流需求中存在一个滑动时间窗口,想想 zset 数据结构的 score 值,是不是可以通过 score 来圈出这个时间窗口来。而且我们只需要保留这个时间窗口,窗口之外的数据都可以砍掉。那这个 zset 的 value 填什么比较合适呢?它只需要保证唯一性即可,用 uuid 会比较浪费空间,那就改用毫秒时间戳吧。
image.png

如图所示,用一个 zset 结构记录用户的行为历史,每一个行为都会作为 zset 中的一个key 保存下来。同一个用户同一种行为用一个 zset 记录。

  1. public class SimpleRateLimiter {
  2. private Jedis jedis;
  3. public SimpleRateLimiter(Jedis jedis) {
  4. this.jedis = jedis;
  5. }
  6. public boolean isActionAllowed(String userId, String actionKey, int period, int maxCount) {
  7. String key = String.format("hist:%s:%s", userId, actionKey);
  8. long nowTs = System.currentTimeMillis();
  9. Pipeline pipe = jedis.pipelined();
  10. pipe.multi();
  11. pipe.zadd(key, nowTs, "" + nowTs);
  12. //将时间窗口外的记录全部清理掉,只保留窗口内的记录
  13. pipe.zremrangeByScore(key, 0, nowTs - period * 1000);
  14. Response<Long> count = pipe.zcard(key);
  15. pipe.expire(key, period + 1);
  16. pipe.exec();
  17. pipe.close();
  18. return count.get() <= maxCount;
  19. }
  20. public static void main(String[] args) {
  21. Jedis jedis = new Jedis();
  22. SimpleRateLimiter limiter = new SimpleRateLimiter(jedis);
  23. for(int i=0;i<20;i++) {
  24. System.out.println(limiter.isActionAllowed("laoqian", "reply", 60, 5));
  25. }
  26. }
  27. }

它的整体思路就是:每一个行为到来时,都维护一次时间窗口。将时间窗口外的记录全部清理掉,只保留窗口内的记录。zset 集合中只有 score 值非常重要,value 值没有特别的意义,只需要保证它是唯一的就可以了。
因为这几个连续的 Redis 操作都是针对同一个 key 的,使用 pipeline 可以显著提升Redis 存取效率。但这种方案也有缺点,因为它要记录时间窗口内所有的行为记录,如果这个量很大,比如限定 60s 内操作不得超过 100w 次这样的参数,它是不适合做这样的限流的,因为会消耗大量的存储空间。

Redis-Cell

Redis 4.0 提供了一个限流 Redis 模块,它叫 redis-cell。该模块也使用了漏斗算法,并 提供了原子的限流指令。

该模块只有 1 条指令 cl.throttle,它的参数和返回值都略显复杂,接下来让我们来看看这 个指令具体该如何使用。
image.png
上面这个指令的意思是允许「用户老钱回复行为」的频率为每 60s 最多 30 次(漏水速 率),漏斗的初始容量为 15,也就是说一开始可以连续回复 15 个帖子,然后才开始受漏水 速率的影响。我们看到这个指令中漏水速率变成了 2 个参数,替代了之前的单个浮点数。用 两个参数相除的结果来表达漏水速率相对单个浮点数要更加直观一些。

  1. > cl.throttle laoqian:reply 15 30 60
  2. 1) (integer) 0 # 0 表示允许,1 表示拒绝
  3. 2) (integer) 15 # 漏斗容量 capacity
  4. 3) (integer) 14 # 漏斗剩余空间 left_quota
  5. 4) (integer) -1 # 如果拒绝了,需要多长时间后再试(漏斗有空间了,单位秒)
  6. 5) (integer) 2 # 多长时间后,漏斗完全空出来(left_quota==capacity,单位秒)

在执行限流指令时,如果被拒绝了,就需要丢弃或重试。cl.throttle 指令考虑的非常周 到,连重试时间都帮你算好了,直接取返回结果数组的第四个值进行 sleep 即可,如果不想 阻塞线程,也可以异步定时任务来重试。