前言

最近在学习限流算法,想把四种限流算法整合进入Dubbo里,于是用DubboFilter来实现限流。

Dubbo版本

  1. <groupId>org.apache.dubbo</groupId>
  2. <artifactId>dubbo</artifactId>
  3. <version>2.7.7</version>

准备工作

构建三个模块,dubbo-api(dubbo暴露的接口),dubbo-provider(dubbo提供者),dubbo-consumer(dubbo消费者)

dubbo-api

  1. public interface DemoService {
  2. String sayHello(String name);
  3. }

dubbo-provider

  1. @DubboService
  2. public class DemoServiceImpl implements DemoService {
  3. @Value("${dubbo.application.name}")
  4. private String serviceName;
  5. @Override
  6. public String sayHello(String name) {
  7. return String.format("[%s] : Hello, %s", serviceName, name);
  8. }
  9. }

Dubbo-consumer

  1. @RestController
  2. @RequestMapping("/test")
  3. @Api(tags = "Dubbo服务测试类")
  4. public class DemoController {
  5. /**
  6. * 为了调试限流,关闭超时报错,关闭重试机制
  7. */
  8. @DubboReference(timeout = 1000000,retries=0)
  9. private DemoService demoService;
  10. @ApiOperation(value = "服务调用测试", notes = "")
  11. @GetMapping("sayHello")
  12. public String sayHello() {
  13. String name = demoService.sayHello("ZywooLee-debug");
  14. return name;
  15. }
  16. }

固定窗口限流算法(Dubbo内嵌)

Dubbo内嵌了令牌桶限流了,不过默认并没有开启,需要在服务提供者里实现SPI,在dubbo-provider/src/main/resources/META-INF/dubbo路径下创建文件org.apache.dubbo.rpc.Filter

文件内容:

  1. tps=org.apache.dubbo.rpc.filter.TpsLimitFilter

并在服务提供者上设置配置,filter会按照数组里配置的先后顺序执行(责任链设计模式),这里指定了tps(也就是SPI设置文件里的tps

  1. //选择Dubbo令牌限流算法
  2. @DubboService(filter = {"tps"},parameters = {"tps","2","tps.interval","1000"})、
  3. public class DemoServiceImpl implements DemoService {
  4. //...
  5. }

接下来查看我们设置的org.apache.dubbo.rpc.filter.TpsLimitFilter

  1. //TPS_LIMIT_RATE_KEY为“tps”,在org.apache.dubbo.rpc.Filter文件里指定的tps对应这里
  2. @Activate(group = CommonConstants.PROVIDER, value = TPS_LIMIT_RATE_KEY)
  3. public class TpsLimitFilter implements Filter {
  4. private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();
  5. @Override
  6. public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
  7. if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
  8. throw new RpcException(
  9. "Failed to invoke service " +
  10. invoker.getInterface().getName() +
  11. "." +
  12. invocation.getMethodName() +
  13. " because exceed max service tps.");
  14. }
  15. return invoker.invoke(invocation);
  16. }
  17. }

可以看到限流的是在org.apache.dubbo.rpc.filter.tps.DefaultTPSLimiter处理的

  1. public class DefaultTPSLimiter implements TPSLimiter {
  2. //key->接口名称(如:com.dubbo.api.DemoService) value:StatItem类(令牌桶算法实现)
  3. private final ConcurrentMap<String, StatItem> stats = new ConcurrentHashMap<String, StatItem>();
  4. @Override
  5. public boolean isAllowable(URL url, Invocation invocation) {
  6. // 窗口内限制请求数
  7. int rate = url.getParameter(TPS_LIMIT_RATE_KEY, -1);
  8. // 窗口长度(单位:ms)
  9. long interval = url.getParameter(TPS_LIMIT_INTERVAL_KEY, DEFAULT_TPS_LIMIT_INTERVAL);
  10. // 服务名称
  11. String serviceKey = url.getServiceKey();
  12. if (rate > 0) {
  13. // 从缓存中获取限流实现对象
  14. StatItem statItem = stats.get(serviceKey);
  15. if (statItem == null) {
  16. stats.putIfAbsent(serviceKey, new StatItem(serviceKey, rate, interval));
  17. statItem = stats.get(serviceKey);
  18. } else {
  19. //限制请求数改变,重新构建限流实现对象
  20. if (statItem.getRate() != rate || statItem.getInterval() != interval) {
  21. stats.put(serviceKey, new StatItem(serviceKey, rate, interval));
  22. statItem = stats.get(serviceKey);
  23. }
  24. }
  25. return statItem.isAllowable();
  26. } else {
  27. // 限流数小于等于0,remove限流实现对象,放行
  28. StatItem statItem = stats.get(serviceKey);
  29. if (statItem != null) {
  30. stats.remove(serviceKey);
  31. }
  32. }
  33. return true;
  34. }
  35. }

限流的实现类是org.apache.dubbo.rpc.filter.tps.StatItem

  1. class StatItem {
  2. //服务名称
  3. private String name;
  4. //上一次访问时间
  5. private long lastResetTime;
  6. //窗口长度(单位:ms)
  7. private long interval;
  8. //计数器
  9. private LongAdder token;
  10. //窗口内限制请求数
  11. private int rate;
  12. StatItem(String name, int rate, long interval) {
  13. this.name = name;
  14. this.rate = rate;
  15. this.interval = interval;
  16. this.lastResetTime = System.currentTimeMillis();
  17. this.token = buildLongAdder(rate);
  18. }
  19. public boolean isAllowable() {
  20. long now = System.currentTimeMillis();
  21. //当前时间戳>上次访问时间+窗口长度,计数器初始化为限制请求数
  22. if (now > lastResetTime + interval) {
  23. token = buildLongAdder(rate);
  24. lastResetTime = now;
  25. }
  26. //计数器小于0,此时间窗口内访问数量已用完
  27. if (token.sum() < 0) {
  28. return false;
  29. }
  30. //计数器次数-1
  31. token.decrement();
  32. return true;
  33. }
  34. public long getInterval() {
  35. return interval;
  36. }
  37. public int getRate() {
  38. return rate;
  39. }
  40. long getLastResetTime() {
  41. return lastResetTime;
  42. }
  43. long getToken() {
  44. return token.sum();
  45. }
  46. @Override
  47. public String toString() {
  48. return new StringBuilder(32).append("StatItem ")
  49. .append("[name=").append(name).append(", ")
  50. .append("rate = ").append(rate).append(", ")
  51. .append("interval = ").append(interval).append("]")
  52. .toString();
  53. }
  54. private LongAdder buildLongAdder(int rate) {
  55. LongAdder adder = new LongAdder();
  56. adder.add(rate);
  57. return adder;
  58. }
  59. }

自定义限流实现

因为是自行实现的限流,所以需要先扩展SPI,首先我先定义一个自己的ZywooLimitFilter,并在org.apache.dubbo.rpc.Filter文件里添加配置

  1. #这个是原先Dubbo提供的TPSLimit
  2. tps=org.apache.dubbo.rpc.filter.TpsLimitFilter
  3. #这个是我自行实现的Filter
  4. zywoo=com.dubbo.filter.ZywooLimitFilter

org.apache.dubbo.rpc.filter.TpsLimitFilter基本无异,只是指定了@Activatevalue,还有将TPSLimiter的实现改为ZywooTPSLimiter,代码如下

  1. package com.dubbo.filter;
  2. import org.apache.dubbo.common.constants.CommonConstants;
  3. import org.apache.dubbo.common.extension.Activate;
  4. import org.apache.dubbo.rpc.*;
  5. import org.apache.dubbo.rpc.filter.tps.TPSLimiter;
  6. /**
  7. * @Auther: Zywoo Lee
  8. * @Date: 2022/4/1 20:46
  9. * @Description: 自定义Filter实现
  10. */
  11. @Activate(group = CommonConstants.PROVIDER, value = "zywoo")
  12. public class ZywooLimitFilter implements Filter {
  13. //Dubbo默认实现
  14. //private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();
  15. //自定义实现
  16. private final TPSLimiter tpsLimiter = new ZywooTPSLimiter();
  17. @Override
  18. public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
  19. if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
  20. throw new RpcException(
  21. "Failed to invoke service " +
  22. invoker.getInterface().getName() +
  23. "." +
  24. invocation.getMethodName() +
  25. " because exceed max service tps.");
  26. }
  27. return invoker.invoke(invocation);
  28. }
  29. }

com.dubbo.filter.ZywooLimitFilter集成了剩余三种限流算法

  1. public class ZywooTPSLimiter implements TPSLimiter {
  2. private final ConcurrentMap<String, LimitItem> stats = new ConcurrentHashMap<>();
  3. @Override
  4. public boolean isAllowable(URL url, Invocation invocation) {
  5. String serviceKey = url.getServiceKey();
  6. //选择限流的策略
  7. String strategy = url.getParameter("strategy");
  8. //获取限流实现类
  9. LimitItem limitItem = stats.get(serviceKey);
  10. if (limitItem == null) {
  11. switch (strategy) {
  12. //漏桶限流算法
  13. case "leakyBucket": {
  14. long rate = url.getParameter("rate", -1L);
  15. long capacity = url.getParameter("capacity", -1L);
  16. long currentTime = System.currentTimeMillis();
  17. stats.putIfAbsent(serviceKey, new LeakyBucketItem(serviceKey, rate, 0L, currentTime, capacity));
  18. break;
  19. }
  20. //固定窗口限流算法
  21. case "fixedWindow": {
  22. //默认1000毫秒滑动一次
  23. long windowUnit = url.getParameter("windowUnit", 1000L);
  24. //获取限流次数
  25. int limitCount = url.getParameter("limitCount", 0);
  26. long currentTime = System.currentTimeMillis();
  27. stats.putIfAbsent(serviceKey, new FixedWindowItem(serviceKey, windowUnit, currentTime, 0, limitCount));
  28. }
  29. //滑动窗口限流算法
  30. case "slidingWindow": {
  31. //限流个数
  32. int limitCount = url.getParameter("limitCount", 1);
  33. //时间窗数量
  34. int sampleCount = url.getParameter("sampleCount", 10);
  35. //时间窗长度
  36. int intervalInMs = url.getParameter("intervalInMs", 1000);
  37. stats.putIfAbsent(serviceKey,new SlidingWindowItem(limitCount,sampleCount,intervalInMs));
  38. }
  39. //无获取到限流算法,直接放行
  40. default: {
  41. return true;
  42. }
  43. }
  44. limitItem = stats.get(serviceKey);
  45. }
  46. return limitItem.isAllowed();
  47. }
  48. }

滑动窗口限流算法(参考Sentinel)

滑动窗口算法我参考了sentinel来进行实现。主要是将WindowWrap里的value固定为LongAdder只记录小时间窗(时间窗长度/时间窗个数)里的访问次数(Sentinel里的value实际上是MetricBucket类,内部维护了LongAdder数组,来去记录这个时间里通过、阻塞、报错、限流的次数分别多少我这边精简实现,记录通过数量即可)

滑动窗口的实现com.dubbo.limit.slidingWindow.LeapArray

  1. /**
  2. * @Auther: Zywoo Lee
  3. * @Date: 2022/4/2 15:57
  4. * @Description: 滑动窗口限流算法
  5. */
  6. public class LeapArray implements LimitItem {
  7. // 样本窗口长度
  8. private int windowLengthInMs;
  9. // 一个时间窗中包含的时间窗数量
  10. private int sampleCount;
  11. // 时间窗长度
  12. private int intervalInMs;
  13. //元素为WindowWrap样本窗口
  14. private final AtomicReferenceArray<WindowWrap> array;
  15. //修改锁
  16. private final ReentrantLock updateLock = new ReentrantLock();
  17. /**
  18. *
  19. * 功能描述: 构造函数
  20. *
  21. * @param: sampleCount 时间窗数量 intervalInMs 时间窗长度
  22. * @return:
  23. * @auther: Zywoo Lee
  24. * @date: 2022/4/5 14:19
  25. */
  26. public LeapArray(int sampleCount, int intervalInMs){
  27. this.windowLengthInMs = intervalInMs / sampleCount;
  28. this.intervalInMs = intervalInMs;
  29. this.sampleCount = sampleCount;
  30. this.array = new AtomicReferenceArray(sampleCount);
  31. }
  32. public WindowWrap currentWindow(){
  33. // 获取当前时间点所在的样本窗口
  34. return currentWindow(System.currentTimeMillis());
  35. }
  36. @Override
  37. public boolean isAllowed() {
  38. //获取当前时间段在哪个滑动窗口
  39. return true;
  40. }
  41. public WindowWrap currentWindow(long timeMillis){
  42. if (timeMillis < 0) {
  43. return null;
  44. }
  45. // 计算当前时间所在的样本窗口id,即在计算数组LeapArray中的索引
  46. int idx = calculateTimeIdx(timeMillis);
  47. // 计算当前样本窗口的开始时间点
  48. long windowStart = calculateWindowStart(timeMillis);
  49. while(true){
  50. WindowWrap old = array.get(idx);
  51. // 若当前时间所在样本窗口为null,说明该样本窗口还不存在,则创建一个
  52. if (old==null){
  53. WindowWrap window = new WindowWrap(windowLengthInMs, windowStart,new LongAdder());
  54. if (array.compareAndSet(idx,null,window)){
  55. return window;
  56. }else{
  57. Thread.yield();
  58. }
  59. }
  60. // 若当前样本窗口的起始时间点与计算出的样本窗口起始时间点相同,
  61. // 则说明这两个是同一个样本窗口
  62. else if(windowStart==old.windowStart()){
  63. return old;
  64. }
  65. // 若当前样本窗口的起始时间点 大于 计算出的样本窗口起始时间点,
  66. // 说明计算出的样本窗口已经过时了,需要将原来的样本窗口替换
  67. else if (windowStart > old.windowStart()){
  68. if (updateLock.tryLock()) {
  69. try {
  70. // 替换掉老的样本窗口
  71. return resetWindowTo(old, windowStart);
  72. } finally {
  73. updateLock.unlock();
  74. }
  75. } else {
  76. // Contention failed, the thread will yield its time slice to wait for bucket available.
  77. Thread.yield();
  78. }
  79. }
  80. // 当前样本窗口的起始时间点 小于 计算出的样本窗口起始时间点,
  81. // 这种情况一般不会出现,因为时间不会倒流。除非人为修改了系统时钟
  82. else if (windowStart < old.windowStart()){
  83. return new WindowWrap(windowLengthInMs, windowStart,new LongAdder());
  84. }
  85. }
  86. }
  87. public int calculateTimeIdx(long timeMillis) {
  88. // 计算出当前时间在哪个样本窗口
  89. long timeId = timeMillis / windowLengthInMs;
  90. // Calculate current index so we can map the timestamp to the leap array.
  91. return (int)(timeId % array.length());
  92. }
  93. private long calculateWindowStart(long timeMillis) {
  94. return timeMillis - timeMillis % windowLengthInMs;
  95. }
  96. private WindowWrap resetWindowTo(WindowWrap w, long startTime) {
  97. System.out.println("重置时间窗");
  98. // 更新窗口起始时间
  99. w.resetTo(startTime);
  100. // 将访问次数数据清零
  101. w.value().reset();
  102. return w;
  103. }
  104. public List values() {
  105. return values(System.currentTimeMillis());
  106. }
  107. public List values(long timeMillis) {
  108. if (timeMillis < 0) {
  109. return new ArrayList();
  110. }
  111. int size = array.length();
  112. List result = new ArrayList(size);
  113. // 逐个遍历array中的每一个样本窗口实例
  114. for (int i = 0; i < size; i++) {
  115. WindowWrap windowWrap = array.get(i);
  116. // 若当前遍历实例为空或已经过时,则继续下一个
  117. if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
  118. continue;
  119. }
  120. // 将当前遍历的样本窗口统计的数据记录到result中
  121. result.add(windowWrap.value());
  122. }
  123. return result;
  124. }
  125. public boolean isWindowDeprecated(long time, WindowWrap windowWrap) {
  126. // 当前时间与当前样本窗口的时间差 大于 时间窗长度,
  127. // 说明当前样本窗口已经过时
  128. return time - windowWrap.windowStart() > intervalInMs;
  129. }
  130. public int getWindowLengthInMs() {
  131. return windowLengthInMs;
  132. }
  133. }

统计类com.dubbo.limit.slidingWindow.WindowWrap

  1. public class WindowWrap {
  2. /**
  3. * 样本窗口长度
  4. */
  5. private final long windowLengthInMs;
  6. /**
  7. * 样本窗口的起始时间戳
  8. */
  9. private long windowStart;
  10. /**
  11. * 当前样本窗口中的统计数据
  12. */
  13. private LongAdder value;
  14. public WindowWrap(long windowLengthInMs, long windowStart,LongAdder value) {
  15. this.windowLengthInMs = windowLengthInMs;
  16. this.windowStart = windowStart;
  17. this.value = value;
  18. }
  19. /**
  20. *
  21. * 功能描述:重置设置此窗口的开始时间戳
  22. *
  23. * @param:
  24. * @return:
  25. * @auther: Zywoo Lee
  26. * @date: 2022/4/5 14:11
  27. */
  28. public WindowWrap resetTo(long startTime){
  29. this.windowStart = startTime;
  30. return this;
  31. }
  32. /**
  33. *
  34. * 功能描述:判断时间戳是否在此时间窗口内
  35. *
  36. * @param:
  37. * @return:
  38. * @auther: Zywoo Lee
  39. * @date: 2022/4/5 14:11
  40. */
  41. public boolean isTimeInWindow(long timeMillis) {
  42. return windowStart <= timeMillis && timeMillis < windowStart + windowLengthInMs;
  43. }
  44. public long windowStart() {
  45. return windowStart;
  46. }
  47. public LongAdder value() {
  48. return value;
  49. }
  50. }

测试用例

  1. @org.junit.Test
  2. public void SlidingWindowTest() throws InterruptedException {
  3. //时间窗长度为1s,分割为10个滑动窗口,时间窗内只能有150个放行
  4. LimitItem slidingWindowItem = new SlidingWindowItem(150,10,1000);
  5. AtomicInteger allowed = new AtomicInteger(0);
  6. AtomicInteger limited = new AtomicInteger(0);
  7. long beginTime = System.currentTimeMillis();
  8. for (int i = 0; i < 2000; i++) {
  9. new Thread(() -> {
  10. //System.out.println(Thread.currentThread().getName()+":"+leakyBucketItem.isAllowed());
  11. if (slidingWindowItem.isAllowed()) {
  12. allowed.addAndGet(1);
  13. } else {
  14. limited.addAndGet(1);
  15. }
  16. }, i + "").start();
  17. //每20次休眠100毫秒
  18. if (i % 20 == 0) {
  19. Thread.sleep(100);
  20. }
  21. }
  22. long endTime = System.currentTimeMillis();
  23. //等待1s让所有线程都请求完再展示结果
  24. Thread.sleep(1000);
  25. System.out.println("执行时间:" + (endTime - beginTime) + "毫秒");
  26. System.out.println("限制次数:" + limited.get() + "通过次数:" + allowed.get());
  27. }

首次测试结果

执行时间:10570毫秒
限制次数:414通过次数:1586

1586/10.570=150.047304,约等于每秒150,符合预期

第二次测试结果

执行时间:10593毫秒
限制次数:415通过次数:1585

1585/10.593=149.627112,约等于每秒150,符合预期

第三次测试结果

执行时间:10640毫秒
限制次数:409通过次数:1591

1591/10.64=149.530075,约等于每秒150,符合预期

令牌桶限流算法(Guava)

令牌桶限流算法我引入了GuavaRateLimiter

版本

  1. <dependency>
  2. <groupId>com.google.guava</groupId>
  3. <artifactId>guava</artifactId>
  4. <version>31.0-jre</version>
  5. <type>pom</type>
  6. </dependency>
  1. /**
  2. * @Auther: Zywoo Lee
  3. * @Date: 2022/4/10 15:00
  4. * @Description: Guava令牌桶限流算法
  5. */
  6. public class GuavaRateLimit implements LimitItem{
  7. /**
  8. * 服务名称
  9. */
  10. String name;
  11. /**
  12. * Guava限流实现
  13. */
  14. RateLimiter limit;
  15. /**
  16. *
  17. * @param name 服务名称
  18. * @param permitsPerSecond 每秒限制通过数
  19. */
  20. public GuavaRateLimit(String name,double permitsPerSecond) {
  21. this.name = name;
  22. limit = RateLimiter.create(permitsPerSecond);
  23. }
  24. @Override
  25. public boolean isAllowed() {
  26. return limit.tryAcquire();
  27. }
  28. }

调用核心链路:

  1. com.dubbo.limit.GuavaRateLimit#isAllowed
  2. com.google.common.util.concurrent.RateLimiter#tryAcquire()
  3. com.google.common.util.concurrent.RateLimiter#tryAcquire(int, long, java.util.concurrent.TimeUnit)
  4. com.google.common.util.concurrent.RateLimiter#reserveAndGetWaitLength
  5. com.google.common.util.concurrent.SmoothRateLimiter#reserveEarliestAvailable

com.google.common.util.concurrent.RateLimiter#tryAcquire(int, long, java.util.concurrent.TimeUnit)

  1. public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
  2. long timeoutMicros = max(unit.toMicros(timeout), 0);
  3. checkPermits(permits);
  4. long microsToWait;
  5. synchronized (mutex()) {
  6. long nowMicros = stopwatch.readMicros();
  7. if (!canAcquire(nowMicros, timeoutMicros)) {
  8. return false;
  9. } else {
  10. //获取令牌所需要的时长(单位:微秒)
  11. microsToWait = reserveAndGetWaitLength(permits, nowMicros);
  12. }
  13. }
  14. //休眠
  15. stopwatch.sleepMicrosUninterruptibly(microsToWait);
  16. return true;
  17. }

com.google.common.util.concurrent.SmoothRateLimiter#reserveEarliestAvailable

  1. @Override
  2. final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
  3. // 同步时间和桶里的令牌数量
  4. resync(nowMicros);
  5. // 当前请求获取成功的时间为 nextFreeTicketMicros
  6. // 与请求数量 requiredPermits 无关
  7. long returnValue = nextFreeTicketMicros;
  8. // 计算当前立即可得的许可数量
  9. double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
  10. // 当前请求需要新生成的许可数量
  11. double freshPermits = requiredPermits - storedPermitsToSpend;
  12. // 生成上述数量新许可需要的时间
  13. long waitMicros =
  14. storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
  15. + (long) (freshPermits * stableIntervalMicros);
  16. // 新许可需要的时间由下一个请求承担
  17. this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
  18. this.storedPermits -= storedPermitsToSpend;
  19. return returnValue;
  20. }

测试用例

  1. /**
  2. *
  3. * 功能描述:Guava令牌桶算法测试
  4. *
  5. * @param:
  6. * @return:
  7. * @auther: Zywoo Lee
  8. * @date: 2022/4/10 17:54
  9. */
  10. @org.junit.Test
  11. public void GuavaRateLimitTest() throws InterruptedException {
  12. //每秒发放两个令牌
  13. LimitItem leakyBucketItem = new GuavaRateLimit("ZywooTest", 2);
  14. AtomicInteger allowed = new AtomicInteger(0);
  15. AtomicInteger limited = new AtomicInteger(0);
  16. long beginTime = System.currentTimeMillis();
  17. for (int i = 0; i < 1000; i++) {
  18. new Thread(() -> {
  19. //System.out.println(Thread.currentThread().getName()+":"+leakyBucketItem.isAllowed());
  20. if (leakyBucketItem.isAllowed()) {
  21. allowed.addAndGet(1);
  22. } else {
  23. limited.addAndGet(1);
  24. }
  25. }, i + "").start();
  26. //每50次休眠500毫秒
  27. if (i % 50 == 0) {
  28. Thread.sleep(500);
  29. }
  30. }
  31. long endTime = System.currentTimeMillis();
  32. //等待1s让所有线程都请求完再展示结果
  33. Thread.sleep(1000);
  34. System.out.println("执行时间:" + (endTime - beginTime) + "毫秒");
  35. System.out.println("限制次数:" + limited.get() + "通过次数:" + allowed.get());
  36. }

第一次执行结果

执行时间:10219毫秒
限制次数:979通过次数:21,符合预期

第二次执行结果

执行时间:10224毫秒
限制次数:979通过次数:21,符合预期

第三次执行结果

执行时间:10231毫秒
限制次数:979通过次数:21,符合预期

漏桶限流算法(自定义,待完善)

  1. /**
  2. * @Auther: Zywoo Lee
  3. * @Date: 2022/4/1 20:56
  4. * @Description: 漏桶限流实现类
  5. */
  6. public class LeakyBucketItem implements LimitItem {
  7. /**
  8. * 服务名称
  9. */
  10. private String name;
  11. /**
  12. * 每秒处理数(出水率)
  13. */
  14. private long rate;
  15. /**
  16. * 当前剩余水量
  17. */
  18. private LongAdder currentWater;
  19. /**
  20. * 最后刷新时间
  21. */
  22. private long refreshTime;
  23. /**
  24. * 桶容量
  25. */
  26. private long capacity;
  27. public LeakyBucketItem(String name, long rate, long refreshTime, long capacity) {
  28. this.name = name;
  29. this.rate = rate;
  30. this.currentWater = new LongAdder();
  31. this.refreshTime = refreshTime;
  32. this.capacity = capacity;
  33. }
  34. @Override
  35. public boolean isAllowed() {
  36. //当前时间
  37. long currentTime = System.currentTimeMillis();
  38. //流出的水量 = (当前时间-上次刷新时间)*出水率
  39. long outWater = (currentTime - refreshTime) / 1000 * rate;
  40. if (currentWater.longValue()<outWater){
  41. currentWater.reset();
  42. }else{
  43. //当前水量=原先剩余水量-流出的水量
  44. currentWater.add(-outWater);
  45. }
  46. //如果当前剩余水量小于桶的容量,则请求放行
  47. if (currentWater.longValue() < capacity) {
  48. currentWater.add(1);
  49. refreshTime = currentTime;
  50. System.out.println("漏桶限流算法:<接受请求>当前容量:" + capacity + "剩余容量:" + currentWater.longValue());
  51. return true;
  52. }
  53. System.out.println("漏桶限流算法:<拒绝请求>当前容量:" + capacity + "剩余容量:" + currentWater.longValue());
  54. return false;
  55. }
  56. }

测试用例

  1. @org.junit.Test
  2. public void LeakyBucketTest() throws InterruptedException {
  3. // 桶容量10,每秒流出速率5
  4. LimitItem leakyBucketItem = new LeakyBucketItem("ZywooTest", 5, 0L, System.currentTimeMillis(), 10);
  5. AtomicInteger allowed = new AtomicInteger(0);
  6. AtomicInteger limited = new AtomicInteger(0);
  7. long beginTime = System.currentTimeMillis();
  8. for (int i = 0; i < 1000; i++) {
  9. new Thread(() -> {
  10. //System.out.println(Thread.currentThread().getName()+":"+leakyBucketItem.isAllowed());
  11. if (leakyBucketItem.isAllowed()) {
  12. allowed.addAndGet(1);
  13. } else {
  14. limited.addAndGet(1);
  15. }
  16. }, i + "").start();
  17. //每50次休眠500毫秒
  18. if (i % 50 == 0) {
  19. Thread.sleep(500);
  20. }
  21. }
  22. long endTime = System.currentTimeMillis();
  23. //等待1s让所有线程都请求完再展示结果
  24. Thread.sleep(1000);
  25. System.out.println("执行时间:" + (endTime - beginTime) + "毫秒");
  26. System.out.println("限制次数:" + limited.get() + "通过次数:" + allowed.get());
  27. }

设置桶容量10,每秒流出速率5,设置1000个线程,每请求50次休眠500毫秒。

第一次执行结果

执行时间:10214毫秒
限制次数:945通过次数:55

也就是说执行时间为50.829秒,在首个执行的1s内,前100个请求有10个请求是能放行的,在之后的9s~10s(10.2s减去1s在这个范围内),每秒只有5个请求能放行的(每秒流出速率5)。预期:10+9 5=55或10+10 5=60。通过次数55在范围内,符合预期。

第二次执行结果

执行时间:10224毫秒
限制次数:945通过次数:55

符合预期

第三次执行结果

执行时间:10241毫秒
限制次数:940通过次数:60

符合预期

Github地址

https://github.com/leeyiyu/dubbo-limit