前言
最近在学习限流算法,想把四种限流算法整合进入Dubbo
里,于是用Dubbo
的Filter
来实现限流。
Dubbo版本
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
<version>2.7.7</version>
准备工作
构建三个模块,dubbo-api(dubbo暴露的接口),dubbo-provider(dubbo提供者),dubbo-consumer(dubbo消费者)
dubbo-api
public interface DemoService {
String sayHello(String name);
}
dubbo-provider
@DubboService
public class DemoServiceImpl implements DemoService {
@Value("${dubbo.application.name}")
private String serviceName;
@Override
public String sayHello(String name) {
return String.format("[%s] : Hello, %s", serviceName, name);
}
}
Dubbo-consumer
@RestController
@RequestMapping("/test")
@Api(tags = "Dubbo服务测试类")
public class DemoController {
/**
* 为了调试限流,关闭超时报错,关闭重试机制
*/
@DubboReference(timeout = 1000000,retries=0)
private DemoService demoService;
@ApiOperation(value = "服务调用测试", notes = "")
@GetMapping("sayHello")
public String sayHello() {
String name = demoService.sayHello("ZywooLee-debug");
return name;
}
}
固定窗口限流算法(Dubbo内嵌)
Dubbo内嵌了令牌桶限流了,不过默认并没有开启,需要在服务提供者里实现SPI,在dubbo-provider/src/main/resources/META-INF/dubbo
路径下创建文件org.apache.dubbo.rpc.Filter
文件内容:
tps=org.apache.dubbo.rpc.filter.TpsLimitFilter
并在服务提供者上设置配置,filter会按照数组里配置的先后顺序执行(责任链设计模式),这里指定了tps
(也就是SPI设置文件里的tps
)
//选择Dubbo令牌限流算法
@DubboService(filter = {"tps"},parameters = {"tps","2","tps.interval","1000"})、
public class DemoServiceImpl implements DemoService {
//...
}
接下来查看我们设置的org.apache.dubbo.rpc.filter.TpsLimitFilter
//TPS_LIMIT_RATE_KEY为“tps”,在org.apache.dubbo.rpc.Filter文件里指定的tps对应这里
@Activate(group = CommonConstants.PROVIDER, value = TPS_LIMIT_RATE_KEY)
public class TpsLimitFilter implements Filter {
private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
throw new RpcException(
"Failed to invoke service " +
invoker.getInterface().getName() +
"." +
invocation.getMethodName() +
" because exceed max service tps.");
}
return invoker.invoke(invocation);
}
}
可以看到限流的是在org.apache.dubbo.rpc.filter.tps.DefaultTPSLimiter
处理的
public class DefaultTPSLimiter implements TPSLimiter {
//key->接口名称(如:com.dubbo.api.DemoService) value:StatItem类(令牌桶算法实现)
private final ConcurrentMap<String, StatItem> stats = new ConcurrentHashMap<String, StatItem>();
@Override
public boolean isAllowable(URL url, Invocation invocation) {
// 窗口内限制请求数
int rate = url.getParameter(TPS_LIMIT_RATE_KEY, -1);
// 窗口长度(单位:ms)
long interval = url.getParameter(TPS_LIMIT_INTERVAL_KEY, DEFAULT_TPS_LIMIT_INTERVAL);
// 服务名称
String serviceKey = url.getServiceKey();
if (rate > 0) {
// 从缓存中获取限流实现对象
StatItem statItem = stats.get(serviceKey);
if (statItem == null) {
stats.putIfAbsent(serviceKey, new StatItem(serviceKey, rate, interval));
statItem = stats.get(serviceKey);
} else {
//限制请求数改变,重新构建限流实现对象
if (statItem.getRate() != rate || statItem.getInterval() != interval) {
stats.put(serviceKey, new StatItem(serviceKey, rate, interval));
statItem = stats.get(serviceKey);
}
}
return statItem.isAllowable();
} else {
// 限流数小于等于0,remove限流实现对象,放行
StatItem statItem = stats.get(serviceKey);
if (statItem != null) {
stats.remove(serviceKey);
}
}
return true;
}
}
限流的实现类是org.apache.dubbo.rpc.filter.tps.StatItem
class StatItem {
//服务名称
private String name;
//上一次访问时间
private long lastResetTime;
//窗口长度(单位:ms)
private long interval;
//计数器
private LongAdder token;
//窗口内限制请求数
private int rate;
StatItem(String name, int rate, long interval) {
this.name = name;
this.rate = rate;
this.interval = interval;
this.lastResetTime = System.currentTimeMillis();
this.token = buildLongAdder(rate);
}
public boolean isAllowable() {
long now = System.currentTimeMillis();
//当前时间戳>上次访问时间+窗口长度,计数器初始化为限制请求数
if (now > lastResetTime + interval) {
token = buildLongAdder(rate);
lastResetTime = now;
}
//计数器小于0,此时间窗口内访问数量已用完
if (token.sum() < 0) {
return false;
}
//计数器次数-1
token.decrement();
return true;
}
public long getInterval() {
return interval;
}
public int getRate() {
return rate;
}
long getLastResetTime() {
return lastResetTime;
}
long getToken() {
return token.sum();
}
@Override
public String toString() {
return new StringBuilder(32).append("StatItem ")
.append("[name=").append(name).append(", ")
.append("rate = ").append(rate).append(", ")
.append("interval = ").append(interval).append("]")
.toString();
}
private LongAdder buildLongAdder(int rate) {
LongAdder adder = new LongAdder();
adder.add(rate);
return adder;
}
}
自定义限流实现
因为是自行实现的限流,所以需要先扩展SPI,首先我先定义一个自己的ZywooLimitFilter
,并在org.apache.dubbo.rpc.Filter
文件里添加配置
#这个是原先Dubbo提供的TPSLimit
tps=org.apache.dubbo.rpc.filter.TpsLimitFilter
#这个是我自行实现的Filter
zywoo=com.dubbo.filter.ZywooLimitFilter
与org.apache.dubbo.rpc.filter.TpsLimitFilter
基本无异,只是指定了@Activate
的value
,还有将TPSLimiter
的实现改为ZywooTPSLimiter
,代码如下
package com.dubbo.filter;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.*;
import org.apache.dubbo.rpc.filter.tps.TPSLimiter;
/**
* @Auther: Zywoo Lee
* @Date: 2022/4/1 20:46
* @Description: 自定义Filter实现
*/
@Activate(group = CommonConstants.PROVIDER, value = "zywoo")
public class ZywooLimitFilter implements Filter {
//Dubbo默认实现
//private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();
//自定义实现
private final TPSLimiter tpsLimiter = new ZywooTPSLimiter();
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
throw new RpcException(
"Failed to invoke service " +
invoker.getInterface().getName() +
"." +
invocation.getMethodName() +
" because exceed max service tps.");
}
return invoker.invoke(invocation);
}
}
com.dubbo.filter.ZywooLimitFilter
集成了剩余三种限流算法
public class ZywooTPSLimiter implements TPSLimiter {
private final ConcurrentMap<String, LimitItem> stats = new ConcurrentHashMap<>();
@Override
public boolean isAllowable(URL url, Invocation invocation) {
String serviceKey = url.getServiceKey();
//选择限流的策略
String strategy = url.getParameter("strategy");
//获取限流实现类
LimitItem limitItem = stats.get(serviceKey);
if (limitItem == null) {
switch (strategy) {
//漏桶限流算法
case "leakyBucket": {
long rate = url.getParameter("rate", -1L);
long capacity = url.getParameter("capacity", -1L);
long currentTime = System.currentTimeMillis();
stats.putIfAbsent(serviceKey, new LeakyBucketItem(serviceKey, rate, 0L, currentTime, capacity));
break;
}
//固定窗口限流算法
case "fixedWindow": {
//默认1000毫秒滑动一次
long windowUnit = url.getParameter("windowUnit", 1000L);
//获取限流次数
int limitCount = url.getParameter("limitCount", 0);
long currentTime = System.currentTimeMillis();
stats.putIfAbsent(serviceKey, new FixedWindowItem(serviceKey, windowUnit, currentTime, 0, limitCount));
}
//滑动窗口限流算法
case "slidingWindow": {
//限流个数
int limitCount = url.getParameter("limitCount", 1);
//时间窗数量
int sampleCount = url.getParameter("sampleCount", 10);
//时间窗长度
int intervalInMs = url.getParameter("intervalInMs", 1000);
stats.putIfAbsent(serviceKey,new SlidingWindowItem(limitCount,sampleCount,intervalInMs));
}
//无获取到限流算法,直接放行
default: {
return true;
}
}
limitItem = stats.get(serviceKey);
}
return limitItem.isAllowed();
}
}
滑动窗口限流算法(参考Sentinel)
滑动窗口算法我参考了sentinel
来进行实现。主要是将WindowWrap
里的value
固定为LongAdder
只记录小时间窗(时间窗长度/时间窗个数)里的访问次数(Sentinel
里的value实际上是MetricBucket
类,内部维护了LongAdder
数组,来去记录这个时间里通过、阻塞、报错、限流的次数分别多少我这边精简实现,记录通过数量即可)
滑动窗口的实现com.dubbo.limit.slidingWindow.LeapArray
/**
* @Auther: Zywoo Lee
* @Date: 2022/4/2 15:57
* @Description: 滑动窗口限流算法
*/
public class LeapArray implements LimitItem {
// 样本窗口长度
private int windowLengthInMs;
// 一个时间窗中包含的时间窗数量
private int sampleCount;
// 时间窗长度
private int intervalInMs;
//元素为WindowWrap样本窗口
private final AtomicReferenceArray<WindowWrap> array;
//修改锁
private final ReentrantLock updateLock = new ReentrantLock();
/**
*
* 功能描述: 构造函数
*
* @param: sampleCount 时间窗数量 intervalInMs 时间窗长度
* @return:
* @auther: Zywoo Lee
* @date: 2022/4/5 14:19
*/
public LeapArray(int sampleCount, int intervalInMs){
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray(sampleCount);
}
public WindowWrap currentWindow(){
// 获取当前时间点所在的样本窗口
return currentWindow(System.currentTimeMillis());
}
@Override
public boolean isAllowed() {
//获取当前时间段在哪个滑动窗口
return true;
}
public WindowWrap currentWindow(long timeMillis){
if (timeMillis < 0) {
return null;
}
// 计算当前时间所在的样本窗口id,即在计算数组LeapArray中的索引
int idx = calculateTimeIdx(timeMillis);
// 计算当前样本窗口的开始时间点
long windowStart = calculateWindowStart(timeMillis);
while(true){
WindowWrap old = array.get(idx);
// 若当前时间所在样本窗口为null,说明该样本窗口还不存在,则创建一个
if (old==null){
WindowWrap window = new WindowWrap(windowLengthInMs, windowStart,new LongAdder());
if (array.compareAndSet(idx,null,window)){
return window;
}else{
Thread.yield();
}
}
// 若当前样本窗口的起始时间点与计算出的样本窗口起始时间点相同,
// 则说明这两个是同一个样本窗口
else if(windowStart==old.windowStart()){
return old;
}
// 若当前样本窗口的起始时间点 大于 计算出的样本窗口起始时间点,
// 说明计算出的样本窗口已经过时了,需要将原来的样本窗口替换
else if (windowStart > old.windowStart()){
if (updateLock.tryLock()) {
try {
// 替换掉老的样本窗口
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
}
// 当前样本窗口的起始时间点 小于 计算出的样本窗口起始时间点,
// 这种情况一般不会出现,因为时间不会倒流。除非人为修改了系统时钟
else if (windowStart < old.windowStart()){
return new WindowWrap(windowLengthInMs, windowStart,new LongAdder());
}
}
}
public int calculateTimeIdx(long timeMillis) {
// 计算出当前时间在哪个样本窗口
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
return (int)(timeId % array.length());
}
private long calculateWindowStart(long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
private WindowWrap resetWindowTo(WindowWrap w, long startTime) {
System.out.println("重置时间窗");
// 更新窗口起始时间
w.resetTo(startTime);
// 将访问次数数据清零
w.value().reset();
return w;
}
public List values() {
return values(System.currentTimeMillis());
}
public List values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList();
}
int size = array.length();
List result = new ArrayList(size);
// 逐个遍历array中的每一个样本窗口实例
for (int i = 0; i < size; i++) {
WindowWrap windowWrap = array.get(i);
// 若当前遍历实例为空或已经过时,则继续下一个
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
continue;
}
// 将当前遍历的样本窗口统计的数据记录到result中
result.add(windowWrap.value());
}
return result;
}
public boolean isWindowDeprecated(long time, WindowWrap windowWrap) {
// 当前时间与当前样本窗口的时间差 大于 时间窗长度,
// 说明当前样本窗口已经过时
return time - windowWrap.windowStart() > intervalInMs;
}
public int getWindowLengthInMs() {
return windowLengthInMs;
}
}
统计类com.dubbo.limit.slidingWindow.WindowWrap
public class WindowWrap {
/**
* 样本窗口长度
*/
private final long windowLengthInMs;
/**
* 样本窗口的起始时间戳
*/
private long windowStart;
/**
* 当前样本窗口中的统计数据
*/
private LongAdder value;
public WindowWrap(long windowLengthInMs, long windowStart,LongAdder value) {
this.windowLengthInMs = windowLengthInMs;
this.windowStart = windowStart;
this.value = value;
}
/**
*
* 功能描述:重置设置此窗口的开始时间戳
*
* @param:
* @return:
* @auther: Zywoo Lee
* @date: 2022/4/5 14:11
*/
public WindowWrap resetTo(long startTime){
this.windowStart = startTime;
return this;
}
/**
*
* 功能描述:判断时间戳是否在此时间窗口内
*
* @param:
* @return:
* @auther: Zywoo Lee
* @date: 2022/4/5 14:11
*/
public boolean isTimeInWindow(long timeMillis) {
return windowStart <= timeMillis && timeMillis < windowStart + windowLengthInMs;
}
public long windowStart() {
return windowStart;
}
public LongAdder value() {
return value;
}
}
测试用例
@org.junit.Test
public void SlidingWindowTest() throws InterruptedException {
//时间窗长度为1s,分割为10个滑动窗口,时间窗内只能有150个放行
LimitItem slidingWindowItem = new SlidingWindowItem(150,10,1000);
AtomicInteger allowed = new AtomicInteger(0);
AtomicInteger limited = new AtomicInteger(0);
long beginTime = System.currentTimeMillis();
for (int i = 0; i < 2000; i++) {
new Thread(() -> {
//System.out.println(Thread.currentThread().getName()+":"+leakyBucketItem.isAllowed());
if (slidingWindowItem.isAllowed()) {
allowed.addAndGet(1);
} else {
limited.addAndGet(1);
}
}, i + "").start();
//每20次休眠100毫秒
if (i % 20 == 0) {
Thread.sleep(100);
}
}
long endTime = System.currentTimeMillis();
//等待1s让所有线程都请求完再展示结果
Thread.sleep(1000);
System.out.println("执行时间:" + (endTime - beginTime) + "毫秒");
System.out.println("限制次数:" + limited.get() + "通过次数:" + allowed.get());
}
首次测试结果
执行时间:10570毫秒
限制次数:414通过次数:1586
1586/10.570=150.047304,约等于每秒150,符合预期
第二次测试结果
执行时间:10593毫秒
限制次数:415通过次数:1585
1585/10.593=149.627112,约等于每秒150,符合预期
第三次测试结果
执行时间:10640毫秒
限制次数:409通过次数:1591
1591/10.64=149.530075,约等于每秒150,符合预期
令牌桶限流算法(Guava)
令牌桶限流算法我引入了Guava
的RateLimiter
版本
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.0-jre</version>
<type>pom</type>
</dependency>
/**
* @Auther: Zywoo Lee
* @Date: 2022/4/10 15:00
* @Description: Guava令牌桶限流算法
*/
public class GuavaRateLimit implements LimitItem{
/**
* 服务名称
*/
String name;
/**
* Guava限流实现
*/
RateLimiter limit;
/**
*
* @param name 服务名称
* @param permitsPerSecond 每秒限制通过数
*/
public GuavaRateLimit(String name,double permitsPerSecond) {
this.name = name;
limit = RateLimiter.create(permitsPerSecond);
}
@Override
public boolean isAllowed() {
return limit.tryAcquire();
}
}
调用核心链路:
com.dubbo.limit.GuavaRateLimit#isAllowed
com.google.common.util.concurrent.RateLimiter#tryAcquire()
com.google.common.util.concurrent.RateLimiter#tryAcquire(int, long, java.util.concurrent.TimeUnit)
com.google.common.util.concurrent.RateLimiter#reserveAndGetWaitLength
com.google.common.util.concurrent.SmoothRateLimiter#reserveEarliestAvailable
com.google.common.util.concurrent.RateLimiter#tryAcquire(int, long, java.util.concurrent.TimeUnit)
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
long timeoutMicros = max(unit.toMicros(timeout), 0);
checkPermits(permits);
long microsToWait;
synchronized (mutex()) {
long nowMicros = stopwatch.readMicros();
if (!canAcquire(nowMicros, timeoutMicros)) {
return false;
} else {
//获取令牌所需要的时长(单位:微秒)
microsToWait = reserveAndGetWaitLength(permits, nowMicros);
}
}
//休眠
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return true;
}
com.google.common.util.concurrent.SmoothRateLimiter#reserveEarliestAvailable
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
// 同步时间和桶里的令牌数量
resync(nowMicros);
// 当前请求获取成功的时间为 nextFreeTicketMicros
// 与请求数量 requiredPermits 无关
long returnValue = nextFreeTicketMicros;
// 计算当前立即可得的许可数量
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
// 当前请求需要新生成的许可数量
double freshPermits = requiredPermits - storedPermitsToSpend;
// 生成上述数量新许可需要的时间
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
// 新许可需要的时间由下一个请求承担
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
测试用例
/**
*
* 功能描述:Guava令牌桶算法测试
*
* @param:
* @return:
* @auther: Zywoo Lee
* @date: 2022/4/10 17:54
*/
@org.junit.Test
public void GuavaRateLimitTest() throws InterruptedException {
//每秒发放两个令牌
LimitItem leakyBucketItem = new GuavaRateLimit("ZywooTest", 2);
AtomicInteger allowed = new AtomicInteger(0);
AtomicInteger limited = new AtomicInteger(0);
long beginTime = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
//System.out.println(Thread.currentThread().getName()+":"+leakyBucketItem.isAllowed());
if (leakyBucketItem.isAllowed()) {
allowed.addAndGet(1);
} else {
limited.addAndGet(1);
}
}, i + "").start();
//每50次休眠500毫秒
if (i % 50 == 0) {
Thread.sleep(500);
}
}
long endTime = System.currentTimeMillis();
//等待1s让所有线程都请求完再展示结果
Thread.sleep(1000);
System.out.println("执行时间:" + (endTime - beginTime) + "毫秒");
System.out.println("限制次数:" + limited.get() + "通过次数:" + allowed.get());
}
第一次执行结果
执行时间:10219毫秒
限制次数:979通过次数:21,符合预期
第二次执行结果
执行时间:10224毫秒
限制次数:979通过次数:21,符合预期
第三次执行结果
执行时间:10231毫秒
限制次数:979通过次数:21,符合预期
漏桶限流算法(自定义,待完善)
/**
* @Auther: Zywoo Lee
* @Date: 2022/4/1 20:56
* @Description: 漏桶限流实现类
*/
public class LeakyBucketItem implements LimitItem {
/**
* 服务名称
*/
private String name;
/**
* 每秒处理数(出水率)
*/
private long rate;
/**
* 当前剩余水量
*/
private LongAdder currentWater;
/**
* 最后刷新时间
*/
private long refreshTime;
/**
* 桶容量
*/
private long capacity;
public LeakyBucketItem(String name, long rate, long refreshTime, long capacity) {
this.name = name;
this.rate = rate;
this.currentWater = new LongAdder();
this.refreshTime = refreshTime;
this.capacity = capacity;
}
@Override
public boolean isAllowed() {
//当前时间
long currentTime = System.currentTimeMillis();
//流出的水量 = (当前时间-上次刷新时间)*出水率
long outWater = (currentTime - refreshTime) / 1000 * rate;
if (currentWater.longValue()<outWater){
currentWater.reset();
}else{
//当前水量=原先剩余水量-流出的水量
currentWater.add(-outWater);
}
//如果当前剩余水量小于桶的容量,则请求放行
if (currentWater.longValue() < capacity) {
currentWater.add(1);
refreshTime = currentTime;
System.out.println("漏桶限流算法:<接受请求>当前容量:" + capacity + "剩余容量:" + currentWater.longValue());
return true;
}
System.out.println("漏桶限流算法:<拒绝请求>当前容量:" + capacity + "剩余容量:" + currentWater.longValue());
return false;
}
}
测试用例
@org.junit.Test
public void LeakyBucketTest() throws InterruptedException {
// 桶容量10,每秒流出速率5
LimitItem leakyBucketItem = new LeakyBucketItem("ZywooTest", 5, 0L, System.currentTimeMillis(), 10);
AtomicInteger allowed = new AtomicInteger(0);
AtomicInteger limited = new AtomicInteger(0);
long beginTime = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
//System.out.println(Thread.currentThread().getName()+":"+leakyBucketItem.isAllowed());
if (leakyBucketItem.isAllowed()) {
allowed.addAndGet(1);
} else {
limited.addAndGet(1);
}
}, i + "").start();
//每50次休眠500毫秒
if (i % 50 == 0) {
Thread.sleep(500);
}
}
long endTime = System.currentTimeMillis();
//等待1s让所有线程都请求完再展示结果
Thread.sleep(1000);
System.out.println("执行时间:" + (endTime - beginTime) + "毫秒");
System.out.println("限制次数:" + limited.get() + "通过次数:" + allowed.get());
}
设置桶容量10,每秒流出速率5,设置1000个线程,每请求50次休眠500毫秒。
第一次执行结果
执行时间:10214毫秒
限制次数:945通过次数:55
也就是说执行时间为50.829秒,在首个执行的1s内,前100个请求有10个请求是能放行的,在之后的9s~10s(10.2s减去1s在这个范围内),每秒只有5个请求能放行的(每秒流出速率5)。预期:10+9 5=55或10+10 5=60。通过次数55在范围内,符合预期。
第二次执行结果
执行时间:10224毫秒
限制次数:945通过次数:55
符合预期
第三次执行结果
执行时间:10241毫秒
限制次数:940通过次数:60
符合预期