前言
最近在学习限流算法,想把四种限流算法整合进入Dubbo里,于是用Dubbo的Filter来实现限流。
Dubbo版本
<groupId>org.apache.dubbo</groupId><artifactId>dubbo</artifactId><version>2.7.7</version>
准备工作
构建三个模块,dubbo-api(dubbo暴露的接口),dubbo-provider(dubbo提供者),dubbo-consumer(dubbo消费者)
dubbo-api
public interface DemoService {String sayHello(String name);}
dubbo-provider
@DubboServicepublic class DemoServiceImpl implements DemoService {@Value("${dubbo.application.name}")private String serviceName;@Overridepublic String sayHello(String name) {return String.format("[%s] : Hello, %s", serviceName, name);}}
Dubbo-consumer
@RestController@RequestMapping("/test")@Api(tags = "Dubbo服务测试类")public class DemoController {/*** 为了调试限流,关闭超时报错,关闭重试机制*/@DubboReference(timeout = 1000000,retries=0)private DemoService demoService;@ApiOperation(value = "服务调用测试", notes = "")@GetMapping("sayHello")public String sayHello() {String name = demoService.sayHello("ZywooLee-debug");return name;}}
固定窗口限流算法(Dubbo内嵌)
Dubbo内嵌了令牌桶限流了,不过默认并没有开启,需要在服务提供者里实现SPI,在dubbo-provider/src/main/resources/META-INF/dubbo路径下创建文件org.apache.dubbo.rpc.Filter
文件内容:
tps=org.apache.dubbo.rpc.filter.TpsLimitFilter
并在服务提供者上设置配置,filter会按照数组里配置的先后顺序执行(责任链设计模式),这里指定了tps(也就是SPI设置文件里的tps)
//选择Dubbo令牌限流算法@DubboService(filter = {"tps"},parameters = {"tps","2","tps.interval","1000"})、public class DemoServiceImpl implements DemoService {//...}
接下来查看我们设置的org.apache.dubbo.rpc.filter.TpsLimitFilter
//TPS_LIMIT_RATE_KEY为“tps”,在org.apache.dubbo.rpc.Filter文件里指定的tps对应这里@Activate(group = CommonConstants.PROVIDER, value = TPS_LIMIT_RATE_KEY)public class TpsLimitFilter implements Filter {private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {throw new RpcException("Failed to invoke service " +invoker.getInterface().getName() +"." +invocation.getMethodName() +" because exceed max service tps.");}return invoker.invoke(invocation);}}
可以看到限流的是在org.apache.dubbo.rpc.filter.tps.DefaultTPSLimiter处理的
public class DefaultTPSLimiter implements TPSLimiter {//key->接口名称(如:com.dubbo.api.DemoService) value:StatItem类(令牌桶算法实现)private final ConcurrentMap<String, StatItem> stats = new ConcurrentHashMap<String, StatItem>();@Overridepublic boolean isAllowable(URL url, Invocation invocation) {// 窗口内限制请求数int rate = url.getParameter(TPS_LIMIT_RATE_KEY, -1);// 窗口长度(单位:ms)long interval = url.getParameter(TPS_LIMIT_INTERVAL_KEY, DEFAULT_TPS_LIMIT_INTERVAL);// 服务名称String serviceKey = url.getServiceKey();if (rate > 0) {// 从缓存中获取限流实现对象StatItem statItem = stats.get(serviceKey);if (statItem == null) {stats.putIfAbsent(serviceKey, new StatItem(serviceKey, rate, interval));statItem = stats.get(serviceKey);} else {//限制请求数改变,重新构建限流实现对象if (statItem.getRate() != rate || statItem.getInterval() != interval) {stats.put(serviceKey, new StatItem(serviceKey, rate, interval));statItem = stats.get(serviceKey);}}return statItem.isAllowable();} else {// 限流数小于等于0,remove限流实现对象,放行StatItem statItem = stats.get(serviceKey);if (statItem != null) {stats.remove(serviceKey);}}return true;}}
限流的实现类是org.apache.dubbo.rpc.filter.tps.StatItem
class StatItem {//服务名称private String name;//上一次访问时间private long lastResetTime;//窗口长度(单位:ms)private long interval;//计数器private LongAdder token;//窗口内限制请求数private int rate;StatItem(String name, int rate, long interval) {this.name = name;this.rate = rate;this.interval = interval;this.lastResetTime = System.currentTimeMillis();this.token = buildLongAdder(rate);}public boolean isAllowable() {long now = System.currentTimeMillis();//当前时间戳>上次访问时间+窗口长度,计数器初始化为限制请求数if (now > lastResetTime + interval) {token = buildLongAdder(rate);lastResetTime = now;}//计数器小于0,此时间窗口内访问数量已用完if (token.sum() < 0) {return false;}//计数器次数-1token.decrement();return true;}public long getInterval() {return interval;}public int getRate() {return rate;}long getLastResetTime() {return lastResetTime;}long getToken() {return token.sum();}@Overridepublic String toString() {return new StringBuilder(32).append("StatItem ").append("[name=").append(name).append(", ").append("rate = ").append(rate).append(", ").append("interval = ").append(interval).append("]").toString();}private LongAdder buildLongAdder(int rate) {LongAdder adder = new LongAdder();adder.add(rate);return adder;}}
自定义限流实现
因为是自行实现的限流,所以需要先扩展SPI,首先我先定义一个自己的ZywooLimitFilter,并在org.apache.dubbo.rpc.Filter文件里添加配置
#这个是原先Dubbo提供的TPSLimittps=org.apache.dubbo.rpc.filter.TpsLimitFilter#这个是我自行实现的Filterzywoo=com.dubbo.filter.ZywooLimitFilter
与org.apache.dubbo.rpc.filter.TpsLimitFilter基本无异,只是指定了@Activate的value,还有将TPSLimiter的实现改为ZywooTPSLimiter,代码如下
package com.dubbo.filter;import org.apache.dubbo.common.constants.CommonConstants;import org.apache.dubbo.common.extension.Activate;import org.apache.dubbo.rpc.*;import org.apache.dubbo.rpc.filter.tps.TPSLimiter;/*** @Auther: Zywoo Lee* @Date: 2022/4/1 20:46* @Description: 自定义Filter实现*/@Activate(group = CommonConstants.PROVIDER, value = "zywoo")public class ZywooLimitFilter implements Filter {//Dubbo默认实现//private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();//自定义实现private final TPSLimiter tpsLimiter = new ZywooTPSLimiter();@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {throw new RpcException("Failed to invoke service " +invoker.getInterface().getName() +"." +invocation.getMethodName() +" because exceed max service tps.");}return invoker.invoke(invocation);}}
com.dubbo.filter.ZywooLimitFilter集成了剩余三种限流算法
public class ZywooTPSLimiter implements TPSLimiter {private final ConcurrentMap<String, LimitItem> stats = new ConcurrentHashMap<>();@Overridepublic boolean isAllowable(URL url, Invocation invocation) {String serviceKey = url.getServiceKey();//选择限流的策略String strategy = url.getParameter("strategy");//获取限流实现类LimitItem limitItem = stats.get(serviceKey);if (limitItem == null) {switch (strategy) {//漏桶限流算法case "leakyBucket": {long rate = url.getParameter("rate", -1L);long capacity = url.getParameter("capacity", -1L);long currentTime = System.currentTimeMillis();stats.putIfAbsent(serviceKey, new LeakyBucketItem(serviceKey, rate, 0L, currentTime, capacity));break;}//固定窗口限流算法case "fixedWindow": {//默认1000毫秒滑动一次long windowUnit = url.getParameter("windowUnit", 1000L);//获取限流次数int limitCount = url.getParameter("limitCount", 0);long currentTime = System.currentTimeMillis();stats.putIfAbsent(serviceKey, new FixedWindowItem(serviceKey, windowUnit, currentTime, 0, limitCount));}//滑动窗口限流算法case "slidingWindow": {//限流个数int limitCount = url.getParameter("limitCount", 1);//时间窗数量int sampleCount = url.getParameter("sampleCount", 10);//时间窗长度int intervalInMs = url.getParameter("intervalInMs", 1000);stats.putIfAbsent(serviceKey,new SlidingWindowItem(limitCount,sampleCount,intervalInMs));}//无获取到限流算法,直接放行default: {return true;}}limitItem = stats.get(serviceKey);}return limitItem.isAllowed();}}
滑动窗口限流算法(参考Sentinel)
滑动窗口算法我参考了sentinel来进行实现。主要是将WindowWrap里的value固定为LongAdder只记录小时间窗(时间窗长度/时间窗个数)里的访问次数(Sentinel里的value实际上是MetricBucket类,内部维护了LongAdder数组,来去记录这个时间里通过、阻塞、报错、限流的次数分别多少我这边精简实现,记录通过数量即可)
滑动窗口的实现com.dubbo.limit.slidingWindow.LeapArray
/*** @Auther: Zywoo Lee* @Date: 2022/4/2 15:57* @Description: 滑动窗口限流算法*/public class LeapArray implements LimitItem {// 样本窗口长度private int windowLengthInMs;// 一个时间窗中包含的时间窗数量private int sampleCount;// 时间窗长度private int intervalInMs;//元素为WindowWrap样本窗口private final AtomicReferenceArray<WindowWrap> array;//修改锁private final ReentrantLock updateLock = new ReentrantLock();/**** 功能描述: 构造函数** @param: sampleCount 时间窗数量 intervalInMs 时间窗长度* @return:* @auther: Zywoo Lee* @date: 2022/4/5 14:19*/public LeapArray(int sampleCount, int intervalInMs){this.windowLengthInMs = intervalInMs / sampleCount;this.intervalInMs = intervalInMs;this.sampleCount = sampleCount;this.array = new AtomicReferenceArray(sampleCount);}public WindowWrap currentWindow(){// 获取当前时间点所在的样本窗口return currentWindow(System.currentTimeMillis());}@Overridepublic boolean isAllowed() {//获取当前时间段在哪个滑动窗口return true;}public WindowWrap currentWindow(long timeMillis){if (timeMillis < 0) {return null;}// 计算当前时间所在的样本窗口id,即在计算数组LeapArray中的索引int idx = calculateTimeIdx(timeMillis);// 计算当前样本窗口的开始时间点long windowStart = calculateWindowStart(timeMillis);while(true){WindowWrap old = array.get(idx);// 若当前时间所在样本窗口为null,说明该样本窗口还不存在,则创建一个if (old==null){WindowWrap window = new WindowWrap(windowLengthInMs, windowStart,new LongAdder());if (array.compareAndSet(idx,null,window)){return window;}else{Thread.yield();}}// 若当前样本窗口的起始时间点与计算出的样本窗口起始时间点相同,// 则说明这两个是同一个样本窗口else if(windowStart==old.windowStart()){return old;}// 若当前样本窗口的起始时间点 大于 计算出的样本窗口起始时间点,// 说明计算出的样本窗口已经过时了,需要将原来的样本窗口替换else if (windowStart > old.windowStart()){if (updateLock.tryLock()) {try {// 替换掉老的样本窗口return resetWindowTo(old, windowStart);} finally {updateLock.unlock();}} else {// Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield();}}// 当前样本窗口的起始时间点 小于 计算出的样本窗口起始时间点,// 这种情况一般不会出现,因为时间不会倒流。除非人为修改了系统时钟else if (windowStart < old.windowStart()){return new WindowWrap(windowLengthInMs, windowStart,new LongAdder());}}}public int calculateTimeIdx(long timeMillis) {// 计算出当前时间在哪个样本窗口long timeId = timeMillis / windowLengthInMs;// Calculate current index so we can map the timestamp to the leap array.return (int)(timeId % array.length());}private long calculateWindowStart(long timeMillis) {return timeMillis - timeMillis % windowLengthInMs;}private WindowWrap resetWindowTo(WindowWrap w, long startTime) {System.out.println("重置时间窗");// 更新窗口起始时间w.resetTo(startTime);// 将访问次数数据清零w.value().reset();return w;}public List values() {return values(System.currentTimeMillis());}public List values(long timeMillis) {if (timeMillis < 0) {return new ArrayList();}int size = array.length();List result = new ArrayList(size);// 逐个遍历array中的每一个样本窗口实例for (int i = 0; i < size; i++) {WindowWrap windowWrap = array.get(i);// 若当前遍历实例为空或已经过时,则继续下一个if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {continue;}// 将当前遍历的样本窗口统计的数据记录到result中result.add(windowWrap.value());}return result;}public boolean isWindowDeprecated(long time, WindowWrap windowWrap) {// 当前时间与当前样本窗口的时间差 大于 时间窗长度,// 说明当前样本窗口已经过时return time - windowWrap.windowStart() > intervalInMs;}public int getWindowLengthInMs() {return windowLengthInMs;}}
统计类com.dubbo.limit.slidingWindow.WindowWrap
public class WindowWrap {/*** 样本窗口长度*/private final long windowLengthInMs;/*** 样本窗口的起始时间戳*/private long windowStart;/*** 当前样本窗口中的统计数据*/private LongAdder value;public WindowWrap(long windowLengthInMs, long windowStart,LongAdder value) {this.windowLengthInMs = windowLengthInMs;this.windowStart = windowStart;this.value = value;}/**** 功能描述:重置设置此窗口的开始时间戳** @param:* @return:* @auther: Zywoo Lee* @date: 2022/4/5 14:11*/public WindowWrap resetTo(long startTime){this.windowStart = startTime;return this;}/**** 功能描述:判断时间戳是否在此时间窗口内** @param:* @return:* @auther: Zywoo Lee* @date: 2022/4/5 14:11*/public boolean isTimeInWindow(long timeMillis) {return windowStart <= timeMillis && timeMillis < windowStart + windowLengthInMs;}public long windowStart() {return windowStart;}public LongAdder value() {return value;}}
测试用例
@org.junit.Testpublic void SlidingWindowTest() throws InterruptedException {//时间窗长度为1s,分割为10个滑动窗口,时间窗内只能有150个放行LimitItem slidingWindowItem = new SlidingWindowItem(150,10,1000);AtomicInteger allowed = new AtomicInteger(0);AtomicInteger limited = new AtomicInteger(0);long beginTime = System.currentTimeMillis();for (int i = 0; i < 2000; i++) {new Thread(() -> {//System.out.println(Thread.currentThread().getName()+":"+leakyBucketItem.isAllowed());if (slidingWindowItem.isAllowed()) {allowed.addAndGet(1);} else {limited.addAndGet(1);}}, i + "").start();//每20次休眠100毫秒if (i % 20 == 0) {Thread.sleep(100);}}long endTime = System.currentTimeMillis();//等待1s让所有线程都请求完再展示结果Thread.sleep(1000);System.out.println("执行时间:" + (endTime - beginTime) + "毫秒");System.out.println("限制次数:" + limited.get() + "通过次数:" + allowed.get());}
首次测试结果
执行时间:10570毫秒
限制次数:414通过次数:1586
1586/10.570=150.047304,约等于每秒150,符合预期
第二次测试结果
执行时间:10593毫秒
限制次数:415通过次数:1585
1585/10.593=149.627112,约等于每秒150,符合预期
第三次测试结果
执行时间:10640毫秒
限制次数:409通过次数:1591
1591/10.64=149.530075,约等于每秒150,符合预期
令牌桶限流算法(Guava)
令牌桶限流算法我引入了Guava的RateLimiter
版本
<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>31.0-jre</version><type>pom</type></dependency>
/*** @Auther: Zywoo Lee* @Date: 2022/4/10 15:00* @Description: Guava令牌桶限流算法*/public class GuavaRateLimit implements LimitItem{/*** 服务名称*/String name;/*** Guava限流实现*/RateLimiter limit;/**** @param name 服务名称* @param permitsPerSecond 每秒限制通过数*/public GuavaRateLimit(String name,double permitsPerSecond) {this.name = name;limit = RateLimiter.create(permitsPerSecond);}@Overridepublic boolean isAllowed() {return limit.tryAcquire();}}
调用核心链路:
com.dubbo.limit.GuavaRateLimit#isAllowedcom.google.common.util.concurrent.RateLimiter#tryAcquire()com.google.common.util.concurrent.RateLimiter#tryAcquire(int, long, java.util.concurrent.TimeUnit)com.google.common.util.concurrent.RateLimiter#reserveAndGetWaitLengthcom.google.common.util.concurrent.SmoothRateLimiter#reserveEarliestAvailable
com.google.common.util.concurrent.RateLimiter#tryAcquire(int, long, java.util.concurrent.TimeUnit)
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {long timeoutMicros = max(unit.toMicros(timeout), 0);checkPermits(permits);long microsToWait;synchronized (mutex()) {long nowMicros = stopwatch.readMicros();if (!canAcquire(nowMicros, timeoutMicros)) {return false;} else {//获取令牌所需要的时长(单位:微秒)microsToWait = reserveAndGetWaitLength(permits, nowMicros);}}//休眠stopwatch.sleepMicrosUninterruptibly(microsToWait);return true;}
com.google.common.util.concurrent.SmoothRateLimiter#reserveEarliestAvailable
@Overridefinal long reserveEarliestAvailable(int requiredPermits, long nowMicros) {// 同步时间和桶里的令牌数量resync(nowMicros);// 当前请求获取成功的时间为 nextFreeTicketMicros// 与请求数量 requiredPermits 无关long returnValue = nextFreeTicketMicros;// 计算当前立即可得的许可数量double storedPermitsToSpend = min(requiredPermits, this.storedPermits);// 当前请求需要新生成的许可数量double freshPermits = requiredPermits - storedPermitsToSpend;// 生成上述数量新许可需要的时间long waitMicros =storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)+ (long) (freshPermits * stableIntervalMicros);// 新许可需要的时间由下一个请求承担this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);this.storedPermits -= storedPermitsToSpend;return returnValue;}
测试用例
/**** 功能描述:Guava令牌桶算法测试** @param:* @return:* @auther: Zywoo Lee* @date: 2022/4/10 17:54*/@org.junit.Testpublic void GuavaRateLimitTest() throws InterruptedException {//每秒发放两个令牌LimitItem leakyBucketItem = new GuavaRateLimit("ZywooTest", 2);AtomicInteger allowed = new AtomicInteger(0);AtomicInteger limited = new AtomicInteger(0);long beginTime = System.currentTimeMillis();for (int i = 0; i < 1000; i++) {new Thread(() -> {//System.out.println(Thread.currentThread().getName()+":"+leakyBucketItem.isAllowed());if (leakyBucketItem.isAllowed()) {allowed.addAndGet(1);} else {limited.addAndGet(1);}}, i + "").start();//每50次休眠500毫秒if (i % 50 == 0) {Thread.sleep(500);}}long endTime = System.currentTimeMillis();//等待1s让所有线程都请求完再展示结果Thread.sleep(1000);System.out.println("执行时间:" + (endTime - beginTime) + "毫秒");System.out.println("限制次数:" + limited.get() + "通过次数:" + allowed.get());}
第一次执行结果
执行时间:10219毫秒
限制次数:979通过次数:21,符合预期
第二次执行结果
执行时间:10224毫秒
限制次数:979通过次数:21,符合预期
第三次执行结果
执行时间:10231毫秒
限制次数:979通过次数:21,符合预期
漏桶限流算法(自定义,待完善)
/*** @Auther: Zywoo Lee* @Date: 2022/4/1 20:56* @Description: 漏桶限流实现类*/public class LeakyBucketItem implements LimitItem {/*** 服务名称*/private String name;/*** 每秒处理数(出水率)*/private long rate;/*** 当前剩余水量*/private LongAdder currentWater;/*** 最后刷新时间*/private long refreshTime;/*** 桶容量*/private long capacity;public LeakyBucketItem(String name, long rate, long refreshTime, long capacity) {this.name = name;this.rate = rate;this.currentWater = new LongAdder();this.refreshTime = refreshTime;this.capacity = capacity;}@Overridepublic boolean isAllowed() {//当前时间long currentTime = System.currentTimeMillis();//流出的水量 = (当前时间-上次刷新时间)*出水率long outWater = (currentTime - refreshTime) / 1000 * rate;if (currentWater.longValue()<outWater){currentWater.reset();}else{//当前水量=原先剩余水量-流出的水量currentWater.add(-outWater);}//如果当前剩余水量小于桶的容量,则请求放行if (currentWater.longValue() < capacity) {currentWater.add(1);refreshTime = currentTime;System.out.println("漏桶限流算法:<接受请求>当前容量:" + capacity + "剩余容量:" + currentWater.longValue());return true;}System.out.println("漏桶限流算法:<拒绝请求>当前容量:" + capacity + "剩余容量:" + currentWater.longValue());return false;}}
测试用例
@org.junit.Testpublic void LeakyBucketTest() throws InterruptedException {// 桶容量10,每秒流出速率5LimitItem leakyBucketItem = new LeakyBucketItem("ZywooTest", 5, 0L, System.currentTimeMillis(), 10);AtomicInteger allowed = new AtomicInteger(0);AtomicInteger limited = new AtomicInteger(0);long beginTime = System.currentTimeMillis();for (int i = 0; i < 1000; i++) {new Thread(() -> {//System.out.println(Thread.currentThread().getName()+":"+leakyBucketItem.isAllowed());if (leakyBucketItem.isAllowed()) {allowed.addAndGet(1);} else {limited.addAndGet(1);}}, i + "").start();//每50次休眠500毫秒if (i % 50 == 0) {Thread.sleep(500);}}long endTime = System.currentTimeMillis();//等待1s让所有线程都请求完再展示结果Thread.sleep(1000);System.out.println("执行时间:" + (endTime - beginTime) + "毫秒");System.out.println("限制次数:" + limited.get() + "通过次数:" + allowed.get());}
设置桶容量10,每秒流出速率5,设置1000个线程,每请求50次休眠500毫秒。
第一次执行结果
执行时间:10214毫秒
限制次数:945通过次数:55
也就是说执行时间为50.829秒,在首个执行的1s内,前100个请求有10个请求是能放行的,在之后的9s~10s(10.2s减去1s在这个范围内),每秒只有5个请求能放行的(每秒流出速率5)。预期:10+9 5=55或10+10 5=60。通过次数55在范围内,符合预期。
第二次执行结果
执行时间:10224毫秒
限制次数:945通过次数:55
符合预期
第三次执行结果
执行时间:10241毫秒
限制次数:940通过次数:60
符合预期
