如何限流?在工作中是怎么做的?说一下具体的实现?

什么是限流

限流可以认为服务降级的一种,限流就是限制系统的输入和输出流量已达到保护系统的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。

限流方法

计数器

实现方式

控制单位时间内的请求数量。

  1. import java.util.concurrent.atomic.AtomicInteger;
  2. public class Counter {
  3. /**
  4. * 最大访问数量
  5. */
  6. private final int limit = 10;
  7. /**
  8. * 访问时间差
  9. */
  10. private final long timeout = 1000;
  11. /**
  12. * 请求时间
  13. */
  14. private long time;
  15. /**
  16. * 当前计数器
  17. */
  18. private AtomicInteger reqCount = new AtomicInteger(0);
  19. public boolean limit() {
  20. long now = System.currentTimeMillis();
  21. if (now < time + timeout) {
  22. // 单位时间内
  23. reqCount.addAndGet(1);
  24. return reqCount.get() <= limit;
  25. } else {
  26. // 超出单位时间
  27. time = now;
  28. reqCount = new AtomicInteger(0);
  29. return true;
  30. }
  31. }
  32. }

劣势:

假设在 00:01 时发生一个请求,在 00:01-00:58 之间不在发送请求,在 00:59 时发送剩下的所有请求 n-1 (n 为限流请求数量),在下一分钟的 00:01 发送 n 个请求,这样在 2 秒钟内请求到达了 2n - 1 个。

设每分钟请求数量为 60 个,每秒可以处理 1 个请求,用户在 00:59 发送 60 个请求,在 01:00 发送 60 个请求 此时 2 秒钟有 120 个请求(每秒 60 个请求),远远大于了每秒钟处理数量的阈值。

滑动窗口

实现方式

滑动窗口是对计数器方式的改进,增加一个时间粒度的度量单位,把一分钟分成若干等分(6 份,每份 10 秒),在每一份上设置独立计数器,在 00:00-00:09 之间发生请求计数器累加 1。当等分数量越大限流统计就越详细。

  1. package com.example.demo1.service;
  2. import java.util.Iterator;
  3. import java.util.Random;
  4. import java.util.concurrent.ConcurrentLinkedQueue;
  5. import java.util.stream.IntStream;
  6. public class TimeWindow {
  7. private ConcurrentLinkedQueue<Long> queue = new ConcurrentLinkedQueue<Long>();
  8. /**
  9. * 间隔秒数
  10. */
  11. private int seconds;
  12. /**
  13. * 最大限流
  14. */
  15. private int max;
  16. public TimeWindow(int max int seconds) {
  17. this.seconds = seconds;
  18. this.max = max;
  19. /**
  20. * 永续线程执行清理queue 任务
  21. */
  22. new Thread(() -> {
  23. while (true) {
  24. try {
  25. // 等待 间隔秒数-1 执行清理操作
  26. Thread.sleep((seconds - 1) * 1000L);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. clean();
  31. }
  32. }).start();
  33. }
  34. public static void main(String[] args) throws Exception {
  35. final TimeWindow timeWindow = new TimeWindow(10 1);
  36. // 测试3个线程
  37. IntStream.range(0 3).forEach((i) -> {
  38. new Thread(() -> {
  39. while (true) {
  40. try {
  41. Thread.sleep(new Random().nextInt(20) * 100);
  42. } catch (InterruptedException e) {
  43. e.printStackTrace();
  44. }
  45. timeWindow.take();
  46. }
  47. }).start();
  48. });
  49. }
  50. /**
  51. * 获取令牌,并且添加时间
  52. */
  53. public void take() {
  54. long start = System.currentTimeMillis();
  55. try {
  56. int size = sizeOfValid();
  57. if (size > max) {
  58. System.err.println("超限");
  59. }
  60. synchronized (queue) {
  61. if (sizeOfValid() > max) {
  62. System.err.println("超限");
  63. System.err.println("queue中有 " + queue.size() + " 最大数量 " + max);
  64. }
  65. this.queue.offer(System.currentTimeMillis());
  66. }
  67. System.out.println("queue中有 " + queue.size() + " 最大数量 " + max);
  68. }
  69. }
  70. public int sizeOfValid() {
  71. Iterator<Long> it = queue.iterator();
  72. Long ms = System.currentTimeMillis() - seconds * 1000;
  73. int count = 0;
  74. while (it.hasNext()) {
  75. long t = it.next();
  76. if (t > ms) {
  77. // 在当前的统计时间范围内
  78. count++;
  79. }
  80. }
  81. return count;
  82. }
  83. /**
  84. * 清理过期的时间
  85. */
  86. public void clean() {
  87. Long c = System.currentTimeMillis() - seconds * 1000;
  88. Long tl = null;
  89. while ((tl = queue.peek()) != null && tl < c) {
  90. System.out.println("清理数据");
  91. queue.poll();
  92. }
  93. }
  94. }

Leaky Bucket 漏桶

实现方式

规定固定容量的桶,有水进入,有水流出。对于流进的水我们无法估计进来的数量、速度,对于流出的水我们可以控制速度。

  1. public class LeakBucket {
  2. /**
  3. * 时间
  4. */
  5. private long time;
  6. /**
  7. * 总量
  8. */
  9. private Double total;
  10. /**
  11. * 水流出去的速度
  12. */
  13. private Double rate;
  14. /**
  15. * 当前总量
  16. */
  17. private Double nowSize;
  18. public boolean limit() {
  19. long now = System.currentTimeMillis();
  20. nowSize = Math.max(0 (nowSize - (now - time) * rate));
  21. time = now;
  22. if ((nowSize + 1) < total) {
  23. nowSize++;
  24. return true;
  25. } else {
  26. return false;
  27. }
  28. }
  29. }

Token Bucket 令牌桶

实现方式

规定固定容量的桶, token 以固定速度往桶内填充, 当桶满时 token 不会被继续放入, 每过来一个请求把 token 从桶中移除, 如果桶中没有 token 不能请求。

  1. public class TokenBucket {
  2. /**
  3. * 时间
  4. */
  5. private long time;
  6. /**
  7. * 总量
  8. */
  9. private Double total;
  10. /**
  11. * token 放入速度
  12. */
  13. private Double rate;
  14. /**
  15. * 当前总量
  16. */
  17. private Double nowSize;
  18. public boolean limit() {
  19. long now = System.currentTimeMillis();
  20. nowSize = Math.min(total nowSize + (now - time) * rate);
  21. time = now;
  22. if (nowSize < 1) {
  23. // 桶里没有token
  24. return false;
  25. } else {
  26. // 存在token
  27. nowSize -= 1;
  28. return true;
  29. }
  30. }
  31. }

工作中的使用

spring cloud gateway

  • spring cloud gateway 默认使用 redis 进行限流,笔者一般只是修改修改参数属于拿来即用,并没有去从头实现上述那些算法。
  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-gateway</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
  8. </dependency>
  1. spring:
  2. cloud:
  3. gateway:
  4. routes:
  5. - id: requestratelimiter_route
  6. uri: lb://pigx-upms
  7. order: 10000
  8. predicates:
  9. - Path=/admin/**
  10. filters:
  11. - name: RequestRateLimiter
  12. args:
  13. redis-rate-limiter.replenishRate: 1 # 令牌桶的容积
  14. redis-rate-limiter.burstCapacity: 3 # 流速 每秒
  15. key-resolver: "#{@remoteAddrKeyResolver}" #SPEL表达式去的对应的bean
  16. - StripPrefix=1
  1. @Bean
  2. KeyResolver remoteAddrKeyResolver() {
  3. return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName());
  4. }

sentinel

  • 通过配置来控制每个 url 的流量
  1. <dependency>
  2. <groupId>com.alibaba.cloud</groupId>
  3. <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
  4. </dependency>
  1. spring:
  2. cloud:
  3. nacos:
  4. discovery:
  5. server-addr: localhost:8848
  6. sentinel:
  7. transport:
  8. dashboard: localhost:8080
  9. port: 8720
  10. datasource:
  11. ds:
  12. nacos:
  13. server-addr: localhost:8848
  14. dataId: spring-cloud-sentinel-nacos
  15. groupId: DEFAULT_GROUP
  16. rule-type: flow
  17. namespace: xxxxxxxx
  • 配置内容在 nacos 上进行编辑
  1. [
  2. {
  3. "resource": "/hello"
  4. "limitApp": "default"
  5. "grade": 1
  6. "count": 1
  7. "strategy": 0
  8. "controlBehavior": 0
  9. "clusterMode": false
  10. }
  11. ]
  • resource:资源名,即限流规则的作用对象。
  • limitApp:流控针对的调用来源,若为 default 则不区分调用来源。
  • grade:限流阈值类型,QPS 或线程数模式,0 代表根据并发数量来限流,1 代表根据 QPS 来进行流量控制。
  • count:限流阈值
  • strategy:判断的根据是资源自身,还是根据其它关联资源 (refResource),还是根据链路入口
  • controlBehavior:流控效果(直接拒绝 / 排队等待 / 慢启动模式)
  • clusterMode:是否为集群模式

总结

sentinel 和 spring cloud gateway 两个框架都是很好的限流框架, 但是在我使用中还没有将spring-cloud-alibaba接入到项目中进行使用, 所以我会选择spring cloud gateway, 当接入完整的或者接入 Nacos 项目使用 setinel 会有更加好的体验.