1. sentinel源码工程搭建
环境准备
在nacos的官网介绍中,sentinel源码运行,需要的java运行环境有:
JDK 1.8+
Maven 3.2+
源码构建
源码下载
从github上,下载sentinel的源码到本地;
https://github.com/alibaba/Sentinel/releases
导入idea工程
1.将lg-parent工程放入sentinel源码中导入idea
2. idea工程目录
3.工程启动
进入到sentinel-dashboard模块下,启动该模块下的
com.alibaba.csp.sentinel.dashboard.DashboardApplication类。
sentinel项目结构
先来看下整个sentinel项目结构
sentinel-core核心模块,限流、降级、系统保护等都在这里实现
sentinel-dashboard控制台模块,可以对连接上的sentinel客户端实现可视化的管理
sentinel-transport传输模块,提供了基本的监控服务端和客户端的API接口,以及一些基于不同库的实现
sentinel-extension扩展模块,主要对DataSource进行了部分扩展实现
sentinel-adapter适配器模块,主要实现了对一些常见框架的适配
sentinel-demo样例模块,可参考怎么使用sentinel进行限流、降级等
sentinel-benchmark基准测试模块,对核心代码的精确性提供基准测试
2. sentinel源码
2.1客户端服务注册
2.1.1客户端服务注册流程分析
2.1.2主要源码分析
1.导入sentinel的起步依赖后,会加载spring-cloud-alibaba-sentinel-2.1.0.RELEASE.jar下面的spring.factories文件,在文件中加载SentinelWebAutoConfiguration类,针对sentinel的自动配置类
2. SentinelWebAutoConfiguration类中声明了一个FilterRegistrationBean,在这个方法中会创建sentinel核心的一个过滤器CommonFilter
@Bean
@ConditionalOnProperty(
name = {"spring.cloud.sentinel.filter.enabled"},
matchIfMissing = true
)
public FilterRegistrationBean sentinelFilter() {
FilterRegistrationBean<Filter> registration = new FilterRegistrationBean();
// 获取sentinel的过滤器配置信息
com.alibaba.cloud.sentinel.SentinelProperties.Filter filterConfig = this.properties.getFilter();
if (filterConfig.getUrlPatterns() == null || filterConfig.getUrlPatterns().isEmpty()) {
List<String> defaultPatterns = new ArrayList();
//设置过滤器拦截路径为/*,拦截所有请求
defaultPatterns.add("/*");
filterConfig.setUrlPatterns(defaultPatterns);
}
registration.addUrlPatterns((String[])filterConfig.getUrlPatterns().toArray(new String[0]));
//创建CommonFilter过滤器
Filter filter = new CommonFilter();
registration.setFilter(filter);
registration.setOrder(filterConfig.getOrder());
registration.addInitParameter("HTTP_METHOD_SPECIFY", String.valueOf(this.properties.getHttpMethodSpecify()));
log.info("[Sentinel Starter] register Sentinel CommonFilter with urlPatterns: {}.", filterConfig.getUrlPatterns());
return registration;
}
CommonFilter过滤器中doFilter方法会拦截所有请求 ```java @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
HttpServletRequest sRequest = (HttpServletRequest) request;
Entry urlEntry = null;
Entry httpMethodUrlEntry = null;
try {
//获取请求路径
String target = FilterUtil.filterTarget(sRequest);
// Clean and unify the URL.
// For REST APIs, you have to clean the URL (e.g. `/foo/1` and `/foo/2` -> `/foo/:id`), or
// the amount of context and resources will exceed the threshold.
UrlCleaner urlCleaner = WebCallbackManager.getUrlCleaner();
if (urlCleaner != null) {
target = urlCleaner.clean(target);
}
// If you intend to exclude some URLs, you can convert the URLs to the empty string ""
// in the UrlCleaner implementation.
if (!StringUtil.isEmpty(target)) {
// Parse the request origin using registered origin parser.
String origin = parseOrigin(sRequest);
ContextUtil.enter(WebServletConfig.WEB_SERVLET_CONTEXT_NAME, origin);
//资源检查--限流/熔断等
urlEntry = SphU.entry(target, EntryType.IN);
// Add method specification if necessary
if (httpMethodSpecify) {
httpMethodUrlEntry = SphU.entry(sRequest.getMethod().toUpperCase() + COLON + target,
EntryType.IN);
}
}
chain.doFilter(request, response);
} catch (BlockException e) {
HttpServletResponse sResponse = (HttpServletResponse) response;
// Return the block page, or redirect to another URL.
WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse, e);
} catch (IOException | ServletException | RuntimeException e2) {
Tracer.traceEntry(e2, urlEntry);
Tracer.traceEntry(e2, httpMethodUrlEntry);
throw e2;
} finally {
if (httpMethodUrlEntry != null) {
httpMethodUrlEntry.exit();
}
if (urlEntry != null) {
urlEntry.exit();
}
ContextUtil.exit();
}
}
4. SphU.entry(target, EntryType.IN)方法进行资源初始化,限流 熔断等操作
```java
public static Entry entry(String name, EntryType type) throws BlockException {
// Env类中有静态方法会被调用
return Env.sph.entry(name, type, 1, OBJECTS0);
}
Env类 ```java public class Env {
public static final Sph sph = new CtSph();
static {
// If init fails, the process will exit.
InitExecutor.doInit();
}
}
6. doInit方法
```java
public static void doInit() {
//判断是否是第一次初始化,不是则直接返回
if (!initialized.compareAndSet(false, true)) {
return;
}
try {
//此处去加载"META-INF/services/"目录下所配置的所有实现了InitFunc接口的类
ServiceLoader<InitFunc> loader = ServiceLoader.load(InitFunc.class);
List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
for (InitFunc initFunc : loader) {
//将加载完的所有实现类排序
insertSorted(initList, initFunc);
}
for (OrderWrapper w : initList) {
//执行每个InitFunc实现类的init()方法,init()方法又会去加载其它所需资源
w.func.init();
RecordLog.info(String.format("[InitExecutor] Executing %s with order %d",
w.func.getClass().getCanonicalName(), w.order));
}
} catch (Exception ex) {
} catch (Error error) {
}
}
w.func.init()方法会执行每个InitFunc实现类的init()方法,其中有一个实现类 HeartbeatSenderInitFunc完成客户端服务心跳发送 ```java /**
- 心跳信号发送器的全局初始化类 *
@author Eric Zhao */ @InitOrder(-1) public class HeartbeatSenderInitFunc implements InitFunc {
private ScheduledExecutorService pool = null;
private void initSchedulerIfNeeded() { if (pool == null) {
pool = new ScheduledThreadPoolExecutor(2,
new NamedThreadFactory("sentinel-heartbeat-send-task", true),
new DiscardOldestPolicy());
} }
@Override public void init() { HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender(); if (sender == null) {
RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded");
return;
}
initSchedulerIfNeeded(); //设置心跳任务发送间隔时间 默认10s发送一次 long interval = retrieveInterval(sender); setIntervalIfNotExists(interval); //启动心跳任务 scheduleHeartbeatTask(sender, interval); }
private boolean isValidHeartbeatInterval(Long interval) { return interval != null && interval > 0; }
private void setIntervalIfNotExists(long interval) { SentinelConfig.setConfig(TransportConfig.HEARTBEAT_INTERVAL_MS, String.valueOf(interval)); }
long retrieveInterval(/@NonNull/ HeartbeatSender sender) { Long intervalInConfig = TransportConfig.getHeartbeatIntervalMs(); if (isValidHeartbeatInterval(intervalInConfig)) {
RecordLog.info("[HeartbeatSenderInitFunc] Using heartbeat interval "
+ "in Sentinel config property: " + intervalInConfig);
return intervalInConfig;
} else {
long senderInterval = sender.intervalMs();
RecordLog.info("[HeartbeatSenderInit] Heartbeat interval not configured in "
+ "config property or invalid, using sender default: " + senderInterval);
return senderInterval;
} }
private void scheduleHeartbeatTask(/@NonNull/ final HeartbeatSender sender, /@Valid/ long interval) { pool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//发送心跳
sender.sendHeartbeat();
} catch (Throwable e) {
RecordLog.warn("[HeartbeatSender] Send heartbeat error", e);
}
}
}, 5000, interval, TimeUnit.MILLISECONDS); RecordLog.info(“[HeartbeatSenderInit] HeartbeatSender started: “
+ sender.getClass().getCanonicalName());
} }
8. sender.sendHeartbeat();方法<br />
9.通过发送/registry/machine最终会到达sentinel服务的MachineRegistryController的receiveHeartBeat方法
```java
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.dashboard.controller;
import com.alibaba.csp.sentinel.dashboard.discovery.AppManagement;
import com.alibaba.csp.sentinel.util.StringUtil;
import com.alibaba.csp.sentinel.dashboard.discovery.MachineDiscovery;
import com.alibaba.csp.sentinel.dashboard.discovery.MachineInfo;
import com.alibaba.csp.sentinel.dashboard.domain.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
/*
* 注册应用处理器
* */
@Controller
@RequestMapping(value = "/registry", produces = MediaType.APPLICATION_JSON_VALUE)
public class MachineRegistryController {
private final Logger logger = LoggerFactory.getLogger(MachineRegistryController.class);
@Autowired
private AppManagement appManagement;
/**
* 注册应用信息
* */
@ResponseBody
@RequestMapping("/machine")
public Result<?> receiveHeartBeat(String app, @RequestParam(value = "app_type", required = false, defaultValue = "0") Integer appType, Long version, String v, String hostname, String ip, Integer port) {
if (app == null) {
app = MachineDiscovery.UNKNOWN_APP_NAME;
}
if (ip == null) {
return Result.ofFail(-1, "ip can't be null");
}
if (port == null) {
return Result.ofFail(-1, "port can't be null");
}
if (port == -1) {
logger.info("Receive heartbeat from " + ip + " but port not set yet");
return Result.ofFail(-1, "your port not set yet");
}
String sentinelVersion = StringUtil.isEmpty(v) ? "unknown" : v;
version = version == null ? System.currentTimeMillis() : version;
try {
MachineInfo machineInfo = new MachineInfo();
machineInfo.setApp(app);
machineInfo.setAppType(appType);
machineInfo.setHostname(hostname);
machineInfo.setIp(ip);
machineInfo.setPort(port);
machineInfo.setHeartbeatVersion(version);
machineInfo.setLastHeartbeat(System.currentTimeMillis());
machineInfo.setVersion(sentinelVersion);
// 将接受到的应用信息添加到应用程序管理appManagement
appManagement.addMachine(machineInfo);
return Result.ofSuccessMsg("success");
} catch (Exception e) {
logger.error("Receive heartbeat error", e);
return Result.ofFail(-1, e.getMessage());
}
}
}
2.2 客户端请求处理
在sentinel中,一些数据存储,限流规则等都是在客户端存储的,那么客户端是怎么处理sentinel发送过来的请求呢?
2.2.1客户端请求处理流程分析
2.2.1 主要源码跟踪
w.func.init()方法会执行每个InitFunc实现类的init()方法,其中有一个实现类CommandCenterInitFunc完成sentinel服务端发送过来的请求相关操作
2. commandCenter.beforeStart()注册处理器,会将所有的处理器(定义在业务方法中指定熔断或降级的处理器类)进行注册,以key-value的形式存入handlerMap中
3. commandCenter.start();启动命令中心 ```java @Override public void start() throws Exception {int nThreads = Runtime.getRuntime().availableProcessors();
this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10),
new NamedThreadFactory("sentinel-command-center-service-executor"),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
CommandCenterLog.info("EventTask rejected");
throw new RejectedExecutionException();
}
});
Runnable serverInitTask = new Runnable() {
int port;
{
try {
//从配置文件中获取端口,如果没有配置设置默认端口8719
port = Integer.parseInt(TransportConfig.getPort());
} catch (Exception e) {
port = DEFAULT_PORT;
}
}
@Override
public void run() {
boolean success = false;
// 获取可用的端口用以创建一个ServerSocket 如果端口被占用 则重试三次 每次端口号+1
ServerSocket serverSocket = getServerSocketFromBasePort(port);
if (serverSocket != null) {
CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
socketReference = serverSocket;
// 在主线程中启动ServerThread用以接收socket请求
executor.submit(new ServerThread(serverSocket));
success = true;
port = serverSocket.getLocalPort();
} else {
CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work");
}
if (!success) {
port = PORT_UNINITIALIZED;
}
TransportConfig.setRuntimePort(port);
executor.shutdown();
}
};
new Thread(serverInitTask).start();
}
/**
- Get a server socket from an available port from a base port.
- Increasing on port number will occur when the port has already been used. *
- @param basePort base port to start
- @return new socket with available port
*/
private static ServerSocket getServerSocketFromBasePort(int basePort) {
int tryCount = 0;
while (true) {
} return null; }try {
//如果发现端口占用情况,则尝试3次,每次端口号加1
ServerSocket server = new ServerSocket(basePort + tryCount / 3, 100);
server.setReuseAddress(true);
return server;
} catch (IOException e) {
tryCount++;
try {
TimeUnit.MILLISECONDS.sleep(30);
} catch (InterruptedException e1) {
break;
}
}
ServerThread: 客户端监听到来自服务端的连接 创建一个业务线程去处理
```java
class ServerThread extends Thread {
private ServerSocket serverSocket;
ServerThread(ServerSocket s) {
this.serverSocket = s;
setName("sentinel-courier-server-accept-thread");
}
@Override
public void run() {
while (true) {
Socket socket = null;
try {
//Socket监听
socket = this.serverSocket.accept();
setSocketSoTimeout(socket);
// 将接收到的socket封装到HttpEventTask中由业务线程去处理 主线程不处理
HttpEventTask eventTask = new HttpEventTask(socket);
bizExecutor.submit(eventTask);
} catch (Exception e) {
CommandCenterLog.info("Server error", e);
if (socket != null) {
try {
socket.close();
} catch (Exception e1) {
CommandCenterLog.info("Error when closing an opened socket", e1);
}
}
try {
// In case of infinite log.
Thread.sleep(10);
} catch (InterruptedException e1) {
// Indicates the task should stop.
break;
}
}
}
}
}
HttpEventTask类处理sentinel发送过来的请求信息
commandHandler.handle(request)处理请求,例如sentinel发送过来的是/setRules,则调用ModifyRulesCommandHandler
@CommandMapping(name = "setRules", desc = "modify the rules, accept param: type={ruleType}&data={ruleJson}")
public class ModifyRulesCommandHandler implements CommandHandler<String> {
@Override
public CommandResponse<String> handle(CommandRequest request) {
//获取规则类型
String type = request.getParam("type");
// rule data in get parameter
//获取规则数据
String data = request.getParam("data");
if (StringUtil.isNotEmpty(data)) {
try {
data = URLDecoder.decode(data, "utf-8");
} catch (Exception e) {
RecordLog.info("Decode rule data error", e);
return CommandResponse.ofFailure(e, "decode rule data error");
}
}
RecordLog.info(String.format("Receiving rule change (type: %s): %s", type, data));
String result = "success";
//限流
if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) {
List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class);
FlowRuleManager.loadRules(flowRules);
if (!writeToDataSource(getFlowDataSource(), flowRules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)) {//授权
List<AuthorityRule> rules = JSONArray.parseArray(data, AuthorityRule.class);
AuthorityRuleManager.loadRules(rules);
if (!writeToDataSource(getAuthorityDataSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (DEGRADE_RULE_TYPE.equalsIgnoreCase(type)) {//熔断
List<DegradeRule> rules = JSONArray.parseArray(data, DegradeRule.class);
DegradeRuleManager.loadRules(rules);
if (!writeToDataSource(getDegradeDataSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (SYSTEM_RULE_TYPE.equalsIgnoreCase(type)) {//系统规则
List<SystemRule> rules = JSONArray.parseArray(data, SystemRule.class);
SystemRuleManager.loadRules(rules);
if (!writeToDataSource(getSystemSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
}
return CommandResponse.ofFailure(new IllegalArgumentException("invalid type"));
}
}
6.假如type是限流则调用FlowRuleManager.loadRules(flowRules)去加载限流规则
7. FlowPropertyListener类的configUpdate方法
8. buildFlowRuleMap方法generateRater(rule)方法
2.3 sentinel限流分析
在刚才源码中我们知道客户端是如何处理sentinel发送过来的限流,那么我们看看客户端是如何完成限流操作的
2.3.1限流流程分析
2.3.2主要源码跟踪
1. SphU.entry(target, EntryType.IN)代码完成限流/熔断等操作
2.经过调用最终会调用CtSph的entryWithPriority方法
lookProcessChain方法构建链路
SlotChainProvider.newSlotChain()
builder.build() 依次在链路上加入不同的插槽
4. chain.entry方法开启链路调用,会对链路中每个进行逐一调用,一直到到FlowSlotcheckFlow限流规则检查
ruleProvider.apply(resource.getName());canPassCheck()判断规则是否通过
DefaultController默认拒接策略