@Author:zxw
@Email:502513206@qq.com
目录
- Sentinel源码分析(一) - 初识Sentinel
- Sentinel源码分析(二) - Entry构建
1.前言
之前已经分析过Entry的构造流程,我们知道Sentinel关于规则和流控的所有逻辑都在其内部的ProcessorSlotChain
调用链路上,首先看看接口提供的方法有哪些。对于链表结构我们清楚,就是不断获取下一个节点,然后进行方法调用。这里该接口提供了两个方法分别为入口entry
和出口exit
。对于出口方法更多的是用来做些统计,比如断路器就可以通过出口的方法来判断是否开启。 ```java void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,Object... args) throws Throwable;
void exit(Context context, ResourceWrapper resourceWrapper, int count, Object… args);
以下是Sentinel官方提供的链路调用图<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/12949707/1644203424930-452d7b0f-61a4-42be-9279-57d28afa9772.png#clientId=u0bf7b0a6-3290-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=202&id=u01e2de47&margin=%5Bobject%20Object%5D&name=image.png&originHeight=202&originWidth=802&originalType=binary&ratio=1&rotation=0&showTitle=false&size=60152&status=done&style=none&taskId=u75ecabe3-2277-4bb5-a23f-e6c6c7c411c&title=&width=802)<br />这两个方法的调用逻辑也如同上图,在开始时顺着链路执行`entry`方法,在结束时顺着链路执行`exit`方法<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/12949707/1644203545258-36f5afe8-6b92-4c9c-b3c6-925f3d36c9df.png#clientId=u0bf7b0a6-3290-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=167&id=u9be3bf56&margin=%5Bobject%20Object%5D&name=image.png&originHeight=167&originWidth=611&originalType=binary&ratio=1&rotation=0&showTitle=false&size=3845&status=done&style=none&taskId=u45299fe1-118d-4185-8ba3-0bbe593dc01&title=&width=611)
<a name="Z0jU4"></a>
# 2.源码分析
上篇文章分析过构造`ProcessorSlotChain`链路是通过Spi机制将其实例化的,并且会通过`@Spi`注解上的order字段进行排序,这里已经通过源码上的注解标识对调用顺序进行了排序
1. NodeSelectorSlot:调用链路
1. ClusterBuilderSlot:统计簇点
1. LogSlot:日志
1. StatisticSlot:统计
1. AuthoritySlot:来源访问控制
1. SystemSlot:系统保护
1. FlowSlot:流量控制
1. DegradeSlot:熔断降级
1. ParamFlowSlot:热点参数限流
在代码中的结构如下<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/12949707/1644057206163-d7c0c377-d1c3-4d82-bac1-f500f13c86fb.png#clientId=ub45639ce-9d93-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=247&id=u2ebe23ff&margin=%5Bobject%20Object%5D&name=image.png&originHeight=247&originWidth=400&originalType=binary&ratio=1&rotation=0&showTitle=false&size=16649&status=done&style=none&taskId=u59d6e27c-cefd-47b0-8005-f708288fce6&title=&width=400)<br />其实会发现和Sentinel官方提供的实现图还是有些细微的出入的<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/12949707/1644057246620-d80ab44c-3836-4d6e-912a-411fad2358b1.png#clientId=ub45639ce-9d93-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=264&id=uef7381e4&margin=%5Bobject%20Object%5D&name=image.png&originHeight=264&originWidth=799&originalType=binary&ratio=1&rotation=0&showTitle=false&size=63875&status=done&style=none&taskId=u6954b4e0-cec2-4b1f-b84d-6670000a2a2&title=&width=799)<br />接下来就对链路中的节点进行分析
<a name="fCo0Q"></a>
## 2.1 NodeSelectorSlot
该节点的作用就是收集资源路径,并将资源路径以树状结构存储起来,用于根据调用路径来限流降级。
```java
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// Build invocation tree
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
context.setCurNode(node);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
2.2 ClusterBuilderSlot
此插槽用于构建资源的 ClusterNode 以及调用来源节点。ClusterNode 保持资源运行统计信息(响应时间、QPS、block 数目、线程数、异常数等)以及原始调用者统计信息列表。来源调用者的名字由 ContextUtil.enter(contextName,origin) 中的 origin 标记
2.3 LogSlot
根据名字可知该节点是个日志记录节点,不过收集的是发生BlockException
异常后的日志
try {
fireEntry(context, resourceWrapper, obj, count, prioritized, args);
} catch (BlockException e) {
EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
context.getOrigin(), count);
throw e;
} catch (Throwable e) {
RecordLog.warn("Unexpected entry exception", e);
}
2.4 StatisticSlot
该节点是个用于统计实时的调用数据。比如资源节点通过的请求数量、线程数量等等。
- clusterNode:资源唯一标识的 ClusterNode 的 runtime 统计
- origin:根据来自不同调用者的统计信息
- defaultnode: 根据上下文条目名称和资源 ID 的 runtime 统计
- 入口的统计
但是统计一般是请求结束后进行统计,所以会将调用传个下个节点,等下个节点执行完毕后在进行统计
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// Request passed, add thread count and pass count.
node.increaseThreadNum();
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
2.5 AuthoritySlot
在该节点中,可以配置我们应用的黑白名单。在我们的Context上下文中有个origin字段来区分应用的来源,在访问就可以通过Sentinel配置应用访问黑白名单限制,以此来拒绝某些origin的访问
public class Context {
private String origin = "";
}
具体的配置通过配置规则时,配置AuthorityRule
来实现
void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
if (authorityRules == null) {
return;
}
Set<AuthorityRule> rules = authorityRules.get(resource.getName());
if (rules == null) {
return;
}
for (AuthorityRule rule : rules) {
if (!AuthorityRuleChecker.passCheck(rule, context)) {
throw new AuthorityException(context.getOrigin(), rule);
}
}
}
以下是添加授权规则的测试代码
public static void initAuthorityRule(){
List<AuthorityRule> rules = new ArrayList<>();
AuthorityRule rule = new AuthorityRule();
rule.setStrategy(RuleConstant.AUTHORITY_WHITE);
rule.setResource("HelloWorld");
rule.setLimitApp("hello");
AuthorityRuleManager.loadRules(rules);
}
2.6 SystemSlot
该节点会根据对于当前系统的整体情况,对入口资源的调用进行动态调配。其原理是让入口的流量和当前系统的预计容量达到一个动态平衡。注意系统规则只对入口流量起作用(调用类型为 EntryType.IN)。该检查默认是关闭状态的需要手动开启,主要会在以下几个方面进行判断
- qps
- 当前线程数
- 平均响应时间
- 平均响应时间
- cpu使用率
当超过系统预期值时,会抛出SystemBlockException
异常
public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
if (resourceWrapper == null) {
return;
}
// Ensure the checking switch is on.
if (!checkSystemStatus.get()) {
return;
}
// for inbound traffic only
if (resourceWrapper.getEntryType() != EntryType.IN) {
return;
}
// total qps
double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
if (currentQps > qps) {
throw new SystemBlockException(resourceWrapper.getName(), "qps");
}
// total thread
int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
if (currentThread > maxThread) {
throw new SystemBlockException(resourceWrapper.getName(), "thread");
}
double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
if (rt > maxRt) {
throw new SystemBlockException(resourceWrapper.getName(), "rt");
}
// load. BBR algorithm.
if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
if (!checkBbr(currentThread)) {
throw new SystemBlockException(resourceWrapper.getName(), "load");
}
}
// cpu usage
if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
throw new SystemBlockException(resourceWrapper.getName(), "cpu");
}
}
2.7 FlowSlot
该节点主要根据预设的资源的统计信息,按照固定的次序,依次生效。如果一个资源对应两条或者多条流控规则,则会根据如下次序依次检验,直到全部通过或者有一个规则生效为止:
- 指定应用生效的规则,即针对调用方限流的;
- 调用方为 other 的规则;
- 调用方为 default 的规则。
我们在配置规则的时候,是个List列表,Sentinel则通过将所有的Rule取出然后循环遍历
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
2.8 DegradeSlot
该节点主要针对资源的平均响应时间(RT)以及异常比率,来决定资源是否在接下来的时间被自动熔断掉。
public boolean tryPass(Context context) {
// Template implementation.
if (currentState.get() == State.CLOSED) {
return true;
}
if (currentState.get() == State.OPEN) {
// For half-open state we allow a request for probing.
return retryTimeoutArrived() && fromOpenToHalfOpen(context);
}
return false;
}
2.9 ParamFlowSlot
该节点是个热点参数流控
void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
if (args != null) {
if (ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
Iterator var5 = rules.iterator();
ParamFlowRule rule;
do {
if (!var5.hasNext()) {
return;
}
rule = (ParamFlowRule)var5.next();
this.applyRealParamIdx(rule, args.length);
ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);
} while(ParamFlowChecker.passCheck(resourceWrapper, rule, count, args));
String triggeredParam = "";
if (args.length > rule.getParamIdx()) {
Object value = args[rule.getParamIdx()];
triggeredParam = String.valueOf(value);
}
throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
}
}
}
3.总结
目前大概对Sentinel所有框架提供的链路节点已经有了一个初步的了解,接下来就深入到节点中看具体的实现。