1. sentinel源码工程搭建
环境准备
在nacos的官网介绍中,sentinel源码运行,需要的java运行环境有:
JDK 1.8+
Maven 3.2+
源码构建
源码下载
从github上,下载sentinel的源码到本地;
https://github.com/alibaba/Sentinel/releases
导入idea工程
1.将lg-parent工程放入sentinel源码中导入idea
2. idea工程目录
3.工程启动
进入到sentinel-dashboard模块下,启动该模块下的
com.alibaba.csp.sentinel.dashboard.DashboardApplication类。
sentinel项目结构
先来看下整个sentinel项目结构
sentinel-core核心模块,限流、降级、系统保护等都在这里实现
sentinel-dashboard控制台模块,可以对连接上的sentinel客户端实现可视化的管理
sentinel-transport传输模块,提供了基本的监控服务端和客户端的API接口,以及一些基于不同库的实现
sentinel-extension扩展模块,主要对DataSource进行了部分扩展实现
sentinel-adapter适配器模块,主要实现了对一些常见框架的适配
sentinel-demo样例模块,可参考怎么使用sentinel进行限流、降级等
sentinel-benchmark基准测试模块,对核心代码的精确性提供基准测试
2. sentinel源码
2.1客户端服务注册
2.1.1客户端服务注册流程分析
2.1.2主要源码分析
1.导入sentinel的起步依赖后,会加载spring-cloud-alibaba-sentinel-2.1.0.RELEASE.jar下面的spring.factories文件,在文件中加载SentinelWebAutoConfiguration类,针对sentinel的自动配置类
2. SentinelWebAutoConfiguration类中声明了一个FilterRegistrationBean,在这个方法中会创建sentinel核心的一个过滤器CommonFilter
@Bean@ConditionalOnProperty(name = {"spring.cloud.sentinel.filter.enabled"},matchIfMissing = true)public FilterRegistrationBean sentinelFilter() {FilterRegistrationBean<Filter> registration = new FilterRegistrationBean();// 获取sentinel的过滤器配置信息com.alibaba.cloud.sentinel.SentinelProperties.Filter filterConfig = this.properties.getFilter();if (filterConfig.getUrlPatterns() == null || filterConfig.getUrlPatterns().isEmpty()) {List<String> defaultPatterns = new ArrayList();//设置过滤器拦截路径为/*,拦截所有请求defaultPatterns.add("/*");filterConfig.setUrlPatterns(defaultPatterns);}registration.addUrlPatterns((String[])filterConfig.getUrlPatterns().toArray(new String[0]));//创建CommonFilter过滤器Filter filter = new CommonFilter();registration.setFilter(filter);registration.setOrder(filterConfig.getOrder());registration.addInitParameter("HTTP_METHOD_SPECIFY", String.valueOf(this.properties.getHttpMethodSpecify()));log.info("[Sentinel Starter] register Sentinel CommonFilter with urlPatterns: {}.", filterConfig.getUrlPatterns());return registration;}
CommonFilter过滤器中doFilter方法会拦截所有请求 ```java @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {HttpServletRequest sRequest = (HttpServletRequest) request;Entry urlEntry = null;Entry httpMethodUrlEntry = null;try {//获取请求路径String target = FilterUtil.filterTarget(sRequest);// Clean and unify the URL.// For REST APIs, you have to clean the URL (e.g. `/foo/1` and `/foo/2` -> `/foo/:id`), or// the amount of context and resources will exceed the threshold.UrlCleaner urlCleaner = WebCallbackManager.getUrlCleaner();if (urlCleaner != null) {target = urlCleaner.clean(target);}// If you intend to exclude some URLs, you can convert the URLs to the empty string ""// in the UrlCleaner implementation.if (!StringUtil.isEmpty(target)) {// Parse the request origin using registered origin parser.String origin = parseOrigin(sRequest);ContextUtil.enter(WebServletConfig.WEB_SERVLET_CONTEXT_NAME, origin);//资源检查--限流/熔断等urlEntry = SphU.entry(target, EntryType.IN);// Add method specification if necessaryif (httpMethodSpecify) {httpMethodUrlEntry = SphU.entry(sRequest.getMethod().toUpperCase() + COLON + target,EntryType.IN);}}chain.doFilter(request, response);} catch (BlockException e) {HttpServletResponse sResponse = (HttpServletResponse) response;// Return the block page, or redirect to another URL.WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse, e);} catch (IOException | ServletException | RuntimeException e2) {Tracer.traceEntry(e2, urlEntry);Tracer.traceEntry(e2, httpMethodUrlEntry);throw e2;} finally {if (httpMethodUrlEntry != null) {httpMethodUrlEntry.exit();}if (urlEntry != null) {urlEntry.exit();}ContextUtil.exit();}
}
4. SphU.entry(target, EntryType.IN)方法进行资源初始化,限流 熔断等操作```javapublic static Entry entry(String name, EntryType type) throws BlockException {// Env类中有静态方法会被调用return Env.sph.entry(name, type, 1, OBJECTS0);}
Env类 ```java public class Env {
public static final Sph sph = new CtSph();
static {
// If init fails, the process will exit.InitExecutor.doInit();
}
}
6. doInit方法```javapublic static void doInit() {//判断是否是第一次初始化,不是则直接返回if (!initialized.compareAndSet(false, true)) {return;}try {//此处去加载"META-INF/services/"目录下所配置的所有实现了InitFunc接口的类ServiceLoader<InitFunc> loader = ServiceLoader.load(InitFunc.class);List<OrderWrapper> initList = new ArrayList<OrderWrapper>();for (InitFunc initFunc : loader) {//将加载完的所有实现类排序insertSorted(initList, initFunc);}for (OrderWrapper w : initList) {//执行每个InitFunc实现类的init()方法,init()方法又会去加载其它所需资源w.func.init();RecordLog.info(String.format("[InitExecutor] Executing %s with order %d",w.func.getClass().getCanonicalName(), w.order));}} catch (Exception ex) {} catch (Error error) {}}
w.func.init()方法会执行每个InitFunc实现类的init()方法,其中有一个实现类 HeartbeatSenderInitFunc完成客户端服务心跳发送 ```java /**
- 心跳信号发送器的全局初始化类 *
@author Eric Zhao */ @InitOrder(-1) public class HeartbeatSenderInitFunc implements InitFunc {
private ScheduledExecutorService pool = null;
private void initSchedulerIfNeeded() { if (pool == null) {
pool = new ScheduledThreadPoolExecutor(2,new NamedThreadFactory("sentinel-heartbeat-send-task", true),new DiscardOldestPolicy());
} }
@Override public void init() { HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender(); if (sender == null) {
RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded");return;
}
initSchedulerIfNeeded(); //设置心跳任务发送间隔时间 默认10s发送一次 long interval = retrieveInterval(sender); setIntervalIfNotExists(interval); //启动心跳任务 scheduleHeartbeatTask(sender, interval); }
private boolean isValidHeartbeatInterval(Long interval) { return interval != null && interval > 0; }
private void setIntervalIfNotExists(long interval) { SentinelConfig.setConfig(TransportConfig.HEARTBEAT_INTERVAL_MS, String.valueOf(interval)); }
long retrieveInterval(/@NonNull/ HeartbeatSender sender) { Long intervalInConfig = TransportConfig.getHeartbeatIntervalMs(); if (isValidHeartbeatInterval(intervalInConfig)) {
RecordLog.info("[HeartbeatSenderInitFunc] Using heartbeat interval "+ "in Sentinel config property: " + intervalInConfig);return intervalInConfig;
} else {
long senderInterval = sender.intervalMs();RecordLog.info("[HeartbeatSenderInit] Heartbeat interval not configured in "+ "config property or invalid, using sender default: " + senderInterval);return senderInterval;
} }
private void scheduleHeartbeatTask(/@NonNull/ final HeartbeatSender sender, /@Valid/ long interval) { pool.scheduleAtFixedRate(new Runnable() {
@Overridepublic void run() {try {//发送心跳sender.sendHeartbeat();} catch (Throwable e) {RecordLog.warn("[HeartbeatSender] Send heartbeat error", e);}}
}, 5000, interval, TimeUnit.MILLISECONDS); RecordLog.info(“[HeartbeatSenderInit] HeartbeatSender started: “
+ sender.getClass().getCanonicalName());
} }
8. sender.sendHeartbeat();方法<br />9.通过发送/registry/machine最终会到达sentinel服务的MachineRegistryController的receiveHeartBeat方法```java/** Copyright 1999-2018 Alibaba Group Holding Ltd.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.alibaba.csp.sentinel.dashboard.controller;import com.alibaba.csp.sentinel.dashboard.discovery.AppManagement;import com.alibaba.csp.sentinel.util.StringUtil;import com.alibaba.csp.sentinel.dashboard.discovery.MachineDiscovery;import com.alibaba.csp.sentinel.dashboard.discovery.MachineInfo;import com.alibaba.csp.sentinel.dashboard.domain.Result;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.http.MediaType;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.ResponseBody;/** 注册应用处理器* */@Controller@RequestMapping(value = "/registry", produces = MediaType.APPLICATION_JSON_VALUE)public class MachineRegistryController {private final Logger logger = LoggerFactory.getLogger(MachineRegistryController.class);@Autowiredprivate AppManagement appManagement;/*** 注册应用信息* */@ResponseBody@RequestMapping("/machine")public Result<?> receiveHeartBeat(String app, @RequestParam(value = "app_type", required = false, defaultValue = "0") Integer appType, Long version, String v, String hostname, String ip, Integer port) {if (app == null) {app = MachineDiscovery.UNKNOWN_APP_NAME;}if (ip == null) {return Result.ofFail(-1, "ip can't be null");}if (port == null) {return Result.ofFail(-1, "port can't be null");}if (port == -1) {logger.info("Receive heartbeat from " + ip + " but port not set yet");return Result.ofFail(-1, "your port not set yet");}String sentinelVersion = StringUtil.isEmpty(v) ? "unknown" : v;version = version == null ? System.currentTimeMillis() : version;try {MachineInfo machineInfo = new MachineInfo();machineInfo.setApp(app);machineInfo.setAppType(appType);machineInfo.setHostname(hostname);machineInfo.setIp(ip);machineInfo.setPort(port);machineInfo.setHeartbeatVersion(version);machineInfo.setLastHeartbeat(System.currentTimeMillis());machineInfo.setVersion(sentinelVersion);// 将接受到的应用信息添加到应用程序管理appManagementappManagement.addMachine(machineInfo);return Result.ofSuccessMsg("success");} catch (Exception e) {logger.error("Receive heartbeat error", e);return Result.ofFail(-1, e.getMessage());}}}
2.2 客户端请求处理
在sentinel中,一些数据存储,限流规则等都是在客户端存储的,那么客户端是怎么处理sentinel发送过来的请求呢?
2.2.1客户端请求处理流程分析

2.2.1 主要源码跟踪
w.func.init()方法会执行每个InitFunc实现类的init()方法,其中有一个实现类CommandCenterInitFunc完成sentinel服务端发送过来的请求相关操作

2. commandCenter.beforeStart()注册处理器,会将所有的处理器(定义在业务方法中指定熔断或降级的处理器类)进行注册,以key-value的形式存入handlerMap中
3. commandCenter.start();启动命令中心 ```java @Override public void start() throws Exception {int nThreads = Runtime.getRuntime().availableProcessors();this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(10),new NamedThreadFactory("sentinel-command-center-service-executor"),new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {CommandCenterLog.info("EventTask rejected");throw new RejectedExecutionException();}});Runnable serverInitTask = new Runnable() {int port;{try {//从配置文件中获取端口,如果没有配置设置默认端口8719port = Integer.parseInt(TransportConfig.getPort());} catch (Exception e) {port = DEFAULT_PORT;}}@Overridepublic void run() {boolean success = false;// 获取可用的端口用以创建一个ServerSocket 如果端口被占用 则重试三次 每次端口号+1ServerSocket serverSocket = getServerSocketFromBasePort(port);if (serverSocket != null) {CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());socketReference = serverSocket;// 在主线程中启动ServerThread用以接收socket请求executor.submit(new ServerThread(serverSocket));success = true;port = serverSocket.getLocalPort();} else {CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work");}if (!success) {port = PORT_UNINITIALIZED;}TransportConfig.setRuntimePort(port);executor.shutdown();}};new Thread(serverInitTask).start();
}
/**
- Get a server socket from an available port from a base port.
- Increasing on port number will occur when the port has already been used. *
- @param basePort base port to start
- @return new socket with available port
*/
private static ServerSocket getServerSocketFromBasePort(int basePort) {
int tryCount = 0;
while (true) {
} return null; }try {//如果发现端口占用情况,则尝试3次,每次端口号加1ServerSocket server = new ServerSocket(basePort + tryCount / 3, 100);server.setReuseAddress(true);return server;} catch (IOException e) {tryCount++;try {TimeUnit.MILLISECONDS.sleep(30);} catch (InterruptedException e1) {break;}}
ServerThread: 客户端监听到来自服务端的连接 创建一个业务线程去处理```javaclass ServerThread extends Thread {private ServerSocket serverSocket;ServerThread(ServerSocket s) {this.serverSocket = s;setName("sentinel-courier-server-accept-thread");}@Overridepublic void run() {while (true) {Socket socket = null;try {//Socket监听socket = this.serverSocket.accept();setSocketSoTimeout(socket);// 将接收到的socket封装到HttpEventTask中由业务线程去处理 主线程不处理HttpEventTask eventTask = new HttpEventTask(socket);bizExecutor.submit(eventTask);} catch (Exception e) {CommandCenterLog.info("Server error", e);if (socket != null) {try {socket.close();} catch (Exception e1) {CommandCenterLog.info("Error when closing an opened socket", e1);}}try {// In case of infinite log.Thread.sleep(10);} catch (InterruptedException e1) {// Indicates the task should stop.break;}}}}}
HttpEventTask类处理sentinel发送过来的请求信息

commandHandler.handle(request)处理请求,例如sentinel发送过来的是/setRules,则调用ModifyRulesCommandHandler
@CommandMapping(name = "setRules", desc = "modify the rules, accept param: type={ruleType}&data={ruleJson}")public class ModifyRulesCommandHandler implements CommandHandler<String> {@Overridepublic CommandResponse<String> handle(CommandRequest request) {//获取规则类型String type = request.getParam("type");// rule data in get parameter//获取规则数据String data = request.getParam("data");if (StringUtil.isNotEmpty(data)) {try {data = URLDecoder.decode(data, "utf-8");} catch (Exception e) {RecordLog.info("Decode rule data error", e);return CommandResponse.ofFailure(e, "decode rule data error");}}RecordLog.info(String.format("Receiving rule change (type: %s): %s", type, data));String result = "success";//限流if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) {List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class);FlowRuleManager.loadRules(flowRules);if (!writeToDataSource(getFlowDataSource(), flowRules)) {result = WRITE_DS_FAILURE_MSG;}return CommandResponse.ofSuccess(result);} else if (AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)) {//授权List<AuthorityRule> rules = JSONArray.parseArray(data, AuthorityRule.class);AuthorityRuleManager.loadRules(rules);if (!writeToDataSource(getAuthorityDataSource(), rules)) {result = WRITE_DS_FAILURE_MSG;}return CommandResponse.ofSuccess(result);} else if (DEGRADE_RULE_TYPE.equalsIgnoreCase(type)) {//熔断List<DegradeRule> rules = JSONArray.parseArray(data, DegradeRule.class);DegradeRuleManager.loadRules(rules);if (!writeToDataSource(getDegradeDataSource(), rules)) {result = WRITE_DS_FAILURE_MSG;}return CommandResponse.ofSuccess(result);} else if (SYSTEM_RULE_TYPE.equalsIgnoreCase(type)) {//系统规则List<SystemRule> rules = JSONArray.parseArray(data, SystemRule.class);SystemRuleManager.loadRules(rules);if (!writeToDataSource(getSystemSource(), rules)) {result = WRITE_DS_FAILURE_MSG;}return CommandResponse.ofSuccess(result);}return CommandResponse.ofFailure(new IllegalArgumentException("invalid type"));}}
6.假如type是限流则调用FlowRuleManager.loadRules(flowRules)去加载限流规则

7. FlowPropertyListener类的configUpdate方法
8. buildFlowRuleMap方法

generateRater(rule)方法


2.3 sentinel限流分析
在刚才源码中我们知道客户端是如何处理sentinel发送过来的限流,那么我们看看客户端是如何完成限流操作的
2.3.1限流流程分析

2.3.2主要源码跟踪
1. SphU.entry(target, EntryType.IN)代码完成限流/熔断等操作

2.经过调用最终会调用CtSph的entryWithPriority方法

lookProcessChain方法构建链路

SlotChainProvider.newSlotChain()
builder.build() 依次在链路上加入不同的插槽

4. chain.entry方法开启链路调用,会对链路中每个进行逐一调用,一直到到FlowSlot
checkFlow限流规则检查

ruleProvider.apply(resource.getName());
canPassCheck()判断规则是否通过


DefaultController默认拒接策略



