背景:通常互联网对外暴露的接口服务都需要做流量限制,以避免被大流量冲击导致服务不可用。常见的情况如秒杀、活动等接口请求往往发生实效较短集中出现在某个时间段。对于对接方的下游客户可能会经常面临的问题:上游接口服务商限制了接口tps请求,下游接口对接方不可避免的会遇到上游接口的限流,通常会返回“调用频率超限…”,这个时候,如果只有一家上游服务商的时候,很容易根据错误码信息区分然后进行请求重试,但是如果在同时对接多家上游的时候就需要我们能够主动性的控制向上游服务商提交的数据流量,对于超过tps限制的流量,直接分发到其他服务商或者排队再次重试以达到正常请求到服务方。
源码:
废话不多说:这里直接上代码:
定义注解:根据注解确定是否限流:
package com.example.ratelimiter.annotation;
import org.redisson.api.RateIntervalUnit;
import java.lang.annotation.*;
/**
* @description: 限流
* @author 98jjy
* @date 2021/8/2
* @version 1.0
*/
@Target(value = {ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedissonRateLimit {
/**
* key前缀
*/
String prefix() default "";
/**
* 限流key
* @return
*/
String spel();
/**
* 流速
* @return
*/
long rate();
/**
* 频率
* @return
*/
long rateInterval();
/**
* 默认时间单位/秒
* @return
*/
RateIntervalUnit unit() default RateIntervalUnit.SECONDS;
/**
* -1:默认不过期
* @return
*/
int expireSecond() default -1;
/**
* 限流不通过时的处理办法
* @return
*/
Class<? extends BaseRateLimitNoPassHandler> noPassHandler() default BaseRateLimitNoPassHandler.class;
/**
* @description: 多个限流Key
* @author 98jjy
* @date 2021/8/2
* @version 1.0
*/
@Target(value = {ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface List {
RedissonRateLimit[] value();
}
}
aop切面处理限流:
package com.example.ratelimiter.config;
import com.example.ratelimiter.annotation.RedissonRateLimit;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RRateLimiter;
import org.redisson.api.RateType;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.DefaultParameterNameDiscoverer;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.stereotype.Component;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;
/**
* @description: redssion 令牌桶算法限流
* @author 98jjy
* @date 2021/8/2
* @version 1.0
*/
@Component
@Aspect
@Slf4j
public class RedissonRateLimitAspect {
/**
* key 统一前缀
*/
private static final String REDISSON_RATE_LIMIT = "redisson_rate_limit:";
/**
* spel表达式解析器
*/
private SpelExpressionParser spelExpressionParser = new SpelExpressionParser();
/**
* 参数名发现器
*/
private DefaultParameterNameDiscoverer parameterNameDiscoverer = new DefaultParameterNameDiscoverer();
@Autowired
private RedissonClient redissonClient;
@Pointcut("@annotation(com.example.ratelimiter.annotation.RedissonRateLimit.List)")
public void rateLimitList() {
}
@Pointcut("@annotation(com.example.ratelimiter.annotation.RedissonRateLimit)")
public void rateLimit() {
}
@Before("rateLimit()")
public void beforeAcquire(JoinPoint point) {
log.info("RateLimitAspect acquire");
doAcquire(point);
log.info("RateLimitAspect acquire exit");
}
@Before("rateLimitList()")
public void beforeRateLimit(JoinPoint point) throws Throwable {
log.info("RateLimitAspect acquireList");
doAcquire(point);
log.info("RateLimitAspect acquireList exit");
}
private void doAcquire(JoinPoint jp) {
Signature signature = jp.getSignature();
MethodSignature methodSignature = (MethodSignature) signature;
//获取方法
Method targetMethod = methodSignature.getMethod();
//获取参数对象数组
Object[] args = jp.getArgs();
RedissonRateLimit[] rateLimits = null;
//寻找注解
if (targetMethod.isAnnotationPresent(RedissonRateLimit.List.class)) {
RedissonRateLimit.List list = targetMethod.getAnnotation(RedissonRateLimit.List.class);
rateLimits = list.value();
} else if (targetMethod.isAnnotationPresent(RedissonRateLimit.class)) {
RedissonRateLimit limit = targetMethod.getAnnotation(RedissonRateLimit.class);
rateLimits = new RedissonRateLimit[]{limit};
}
String[] parameterNames = parameterNameDiscoverer.getParameterNames(targetMethod);
for (RedissonRateLimit limit : rateLimits) {
String spel = limit.spel();
// 获得方法参数名数组
if (parameterNames != null && parameterNames.length > 0){
EvaluationContext context = new StandardEvaluationContext();
//获取方法参数值
for (int i = 0; i < args.length; i++) {
// 替换spel里的变量值为实际值, 比如 #user --> user对象
context.setVariable(parameterNames[i],args[i]);
}
// 解析出的key
spel = spelExpressionParser.parseExpression(spel).getValue(context).toString();
}
String key = REDISSON_RATE_LIMIT + limit.prefix() + ":" +spel;
RRateLimiter rRateLimiter = redissonClient.getRateLimiter(key);
rRateLimiter.trySetRate(RateType.OVERALL, limit.rate(), limit.rateInterval(), limit.unit());
if (limit.expireSecond() !=-1) {
//限流器过期自动删除
rRateLimiter.expire(limit.expireSecond(), TimeUnit.SECONDS);
}
if (rRateLimiter.tryAcquire()) {
continue;
} else {
log.warn("rateLimiter:{} no pass", key);
try {
limit.noPassHandler().getConstructor().newInstance().handle(args);
} catch (Exception e) {
log.error("limit.noPassHandler exception:{}", e);
}
throw new RuntimeException("调用频率超限制,请稍后再试");
}
}
}
}
使用方式:
/**
* 单个参数限流
*/
@RedissonRateLimit(prefix = "trade", spel = "#trade.rootMchId", rate = 1, rateInterval = 2,
unit = RateIntervalUnit.SECONDS, expireSecond = 5,
noPassHandler = TradeRateLimitNoPassHandler.class)
@PostMapping("/trade")
public ResponseEntity trade(@RequestBody Trade trade) {
return ResponseEntity.ok("success");
}
/*
* 多个限流,注意顺序,按用户id应该放在前面,总量限流放后面
*/
@RedissonRateLimit.List({
@RedissonRateLimit(prefix = "trade:userId", rate = 5, rateInterval = 1, unit = RateIntervalUnit.SECONDS, spel = "#trade.userId", expireSecond = 5),
@RedissonRateLimit(prefix = "trade:rootMchId", rate = 300, rateInterval = 1, unit = RateIntervalUnit.SECONDS, spel = "#trade.rootMchId")})
@PostMapping("/pay")
public ResponseEntity pay(@RequestBody Trade trade){
return ResponseEntity.ok("success");
}