1. sentinel源码工程搭建

环境准备
在nacos的官网介绍中,sentinel源码运行,需要的java运行环境有:
JDK 1.8+
Maven 3.2+
源码构建
源码下载
从github上,下载sentinel的源码到本地;
https://github.com/alibaba/Sentinel/releases

导入idea工程
1.将lg-parent工程放入sentinel源码中导入idea
2. idea工程目录
Sentinel源码分析 - 图1
3.工程启动
进入到sentinel-dashboard模块下,启动该模块下的
com.alibaba.csp.sentinel.dashboard.DashboardApplication类。
Sentinel源码分析 - 图2
sentinel项目结构
先来看下整个sentinel项目结构
sentinel-core核心模块,限流、降级、系统保护等都在这里实现
sentinel-dashboard控制台模块,可以对连接上的sentinel客户端实现可视化的管理
sentinel-transport传输模块,提供了基本的监控服务端和客户端的API接口,以及一些基于不同库的实现
sentinel-extension扩展模块,主要对DataSource进行了部分扩展实现
sentinel-adapter适配器模块,主要实现了对一些常见框架的适配
sentinel-demo样例模块,可参考怎么使用sentinel进行限流、降级等
sentinel-benchmark基准测试模块,对核心代码的精确性提供基准测试

2. sentinel源码

2.1客户端服务注册

2.1.1客户端服务注册流程分析

Sentinel源码分析 - 图3

2.1.2主要源码分析

1.导入sentinel的起步依赖后,会加载spring-cloud-alibaba-sentinel-2.1.0.RELEASE.jar下面的spring.factories文件,在文件中加载SentinelWebAutoConfiguration类,针对sentinel的自动配置类
Sentinel源码分析 - 图4
2. SentinelWebAutoConfiguration类中声明了一个FilterRegistrationBean,在这个方法中会创建sentinel核心的一个过滤器CommonFilter

  1. @Bean
  2. @ConditionalOnProperty(
  3. name = {"spring.cloud.sentinel.filter.enabled"},
  4. matchIfMissing = true
  5. )
  6. public FilterRegistrationBean sentinelFilter() {
  7. FilterRegistrationBean<Filter> registration = new FilterRegistrationBean();
  8. // 获取sentinel的过滤器配置信息
  9. com.alibaba.cloud.sentinel.SentinelProperties.Filter filterConfig = this.properties.getFilter();
  10. if (filterConfig.getUrlPatterns() == null || filterConfig.getUrlPatterns().isEmpty()) {
  11. List<String> defaultPatterns = new ArrayList();
  12. //设置过滤器拦截路径为/*,拦截所有请求
  13. defaultPatterns.add("/*");
  14. filterConfig.setUrlPatterns(defaultPatterns);
  15. }
  16. registration.addUrlPatterns((String[])filterConfig.getUrlPatterns().toArray(new String[0]));
  17. //创建CommonFilter过滤器
  18. Filter filter = new CommonFilter();
  19. registration.setFilter(filter);
  20. registration.setOrder(filterConfig.getOrder());
  21. registration.addInitParameter("HTTP_METHOD_SPECIFY", String.valueOf(this.properties.getHttpMethodSpecify()));
  22. log.info("[Sentinel Starter] register Sentinel CommonFilter with urlPatterns: {}.", filterConfig.getUrlPatterns());
  23. return registration;
  24. }
  1. CommonFilter过滤器中doFilter方法会拦截所有请求 ```java @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)

    1. throws IOException, ServletException {
    2. HttpServletRequest sRequest = (HttpServletRequest) request;
    3. Entry urlEntry = null;
    4. Entry httpMethodUrlEntry = null;
    5. try {
    6. //获取请求路径
    7. String target = FilterUtil.filterTarget(sRequest);
    8. // Clean and unify the URL.
    9. // For REST APIs, you have to clean the URL (e.g. `/foo/1` and `/foo/2` -> `/foo/:id`), or
    10. // the amount of context and resources will exceed the threshold.
    11. UrlCleaner urlCleaner = WebCallbackManager.getUrlCleaner();
    12. if (urlCleaner != null) {
    13. target = urlCleaner.clean(target);
    14. }
    15. // If you intend to exclude some URLs, you can convert the URLs to the empty string ""
    16. // in the UrlCleaner implementation.
    17. if (!StringUtil.isEmpty(target)) {
    18. // Parse the request origin using registered origin parser.
    19. String origin = parseOrigin(sRequest);
    20. ContextUtil.enter(WebServletConfig.WEB_SERVLET_CONTEXT_NAME, origin);
    21. //资源检查--限流/熔断等
    22. urlEntry = SphU.entry(target, EntryType.IN);
    23. // Add method specification if necessary
    24. if (httpMethodSpecify) {
    25. httpMethodUrlEntry = SphU.entry(sRequest.getMethod().toUpperCase() + COLON + target,
    26. EntryType.IN);
    27. }
    28. }
    29. chain.doFilter(request, response);
    30. } catch (BlockException e) {
    31. HttpServletResponse sResponse = (HttpServletResponse) response;
    32. // Return the block page, or redirect to another URL.
    33. WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse, e);
    34. } catch (IOException | ServletException | RuntimeException e2) {
    35. Tracer.traceEntry(e2, urlEntry);
    36. Tracer.traceEntry(e2, httpMethodUrlEntry);
    37. throw e2;
    38. } finally {
    39. if (httpMethodUrlEntry != null) {
    40. httpMethodUrlEntry.exit();
    41. }
    42. if (urlEntry != null) {
    43. urlEntry.exit();
    44. }
    45. ContextUtil.exit();
    46. }

    }

  1. 4. SphU.entry(target, EntryType.IN)方法进行资源初始化,限流 熔断等操作
  2. ```java
  3. public static Entry entry(String name, EntryType type) throws BlockException {
  4. // Env类中有静态方法会被调用
  5. return Env.sph.entry(name, type, 1, OBJECTS0);
  6. }
  1. Env类 ```java public class Env {

    public static final Sph sph = new CtSph();

    static {

    1. // If init fails, the process will exit.
    2. InitExecutor.doInit();

    }

}

  1. 6. doInit方法
  2. ```java
  3. public static void doInit() {
  4. //判断是否是第一次初始化,不是则直接返回
  5. if (!initialized.compareAndSet(false, true)) {
  6. return;
  7. }
  8. try {
  9. //此处去加载"META-INF/services/"目录下所配置的所有实现了InitFunc接口的类
  10. ServiceLoader<InitFunc> loader = ServiceLoader.load(InitFunc.class);
  11. List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
  12. for (InitFunc initFunc : loader) {
  13. //将加载完的所有实现类排序
  14. insertSorted(initList, initFunc);
  15. }
  16. for (OrderWrapper w : initList) {
  17. //执行每个InitFunc实现类的init()方法,init()方法又会去加载其它所需资源
  18. w.func.init();
  19. RecordLog.info(String.format("[InitExecutor] Executing %s with order %d",
  20. w.func.getClass().getCanonicalName(), w.order));
  21. }
  22. } catch (Exception ex) {
  23. } catch (Error error) {
  24. }
  25. }
  1. w.func.init()方法会执行每个InitFunc实现类的init()方法,其中有一个实现类 HeartbeatSenderInitFunc完成客户端服务心跳发送 ```java /**

    • 心跳信号发送器的全局初始化类 *
    • @author Eric Zhao */ @InitOrder(-1) public class HeartbeatSenderInitFunc implements InitFunc {

      private ScheduledExecutorService pool = null;

      private void initSchedulerIfNeeded() { if (pool == null) {

      1. pool = new ScheduledThreadPoolExecutor(2,
      2. new NamedThreadFactory("sentinel-heartbeat-send-task", true),
      3. new DiscardOldestPolicy());

      } }

      @Override public void init() { HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender(); if (sender == null) {

      1. RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded");
      2. return;

      }

      initSchedulerIfNeeded(); //设置心跳任务发送间隔时间 默认10s发送一次 long interval = retrieveInterval(sender); setIntervalIfNotExists(interval); //启动心跳任务 scheduleHeartbeatTask(sender, interval); }

      private boolean isValidHeartbeatInterval(Long interval) { return interval != null && interval > 0; }

      private void setIntervalIfNotExists(long interval) { SentinelConfig.setConfig(TransportConfig.HEARTBEAT_INTERVAL_MS, String.valueOf(interval)); }

      long retrieveInterval(/@NonNull/ HeartbeatSender sender) { Long intervalInConfig = TransportConfig.getHeartbeatIntervalMs(); if (isValidHeartbeatInterval(intervalInConfig)) {

      1. RecordLog.info("[HeartbeatSenderInitFunc] Using heartbeat interval "
      2. + "in Sentinel config property: " + intervalInConfig);
      3. return intervalInConfig;

      } else {

      1. long senderInterval = sender.intervalMs();
      2. RecordLog.info("[HeartbeatSenderInit] Heartbeat interval not configured in "
      3. + "config property or invalid, using sender default: " + senderInterval);
      4. return senderInterval;

      } }

      private void scheduleHeartbeatTask(/@NonNull/ final HeartbeatSender sender, /@Valid/ long interval) { pool.scheduleAtFixedRate(new Runnable() {

      1. @Override
      2. public void run() {
      3. try {
      4. //发送心跳
      5. sender.sendHeartbeat();
      6. } catch (Throwable e) {
      7. RecordLog.warn("[HeartbeatSender] Send heartbeat error", e);
      8. }
      9. }

      }, 5000, interval, TimeUnit.MILLISECONDS); RecordLog.info(“[HeartbeatSenderInit] HeartbeatSender started: “

      1. + sender.getClass().getCanonicalName());

      } }

  1. 8. sender.sendHeartbeat();方法<br />![](https://cdn.nlark.com/yuque/0/2021/png/12497888/1618667042566-c2c139ce-4b67-4682-9dfc-4b29eb09b05d.png#clientId=uba14a92f-5cad-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=538&id=ue335e04a&margin=%5Bobject%20Object%5D&originHeight=1076&originWidth=1584&originalType=binary&ratio=1&rotation=0&showTitle=false&size=228386&status=done&style=none&taskId=u73ba7ca5-febd-481d-a155-c309bd3c0cb&title=&width=792)
  2. 9.通过发送/registry/machine最终会到达sentinel服务的MachineRegistryControllerreceiveHeartBeat方法
  3. ```java
  4. /*
  5. * Copyright 1999-2018 Alibaba Group Holding Ltd.
  6. *
  7. * Licensed under the Apache License, Version 2.0 (the "License");
  8. * you may not use this file except in compliance with the License.
  9. * You may obtain a copy of the License at
  10. *
  11. * http://www.apache.org/licenses/LICENSE-2.0
  12. *
  13. * Unless required by applicable law or agreed to in writing, software
  14. * distributed under the License is distributed on an "AS IS" BASIS,
  15. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  16. * See the License for the specific language governing permissions and
  17. * limitations under the License.
  18. */
  19. package com.alibaba.csp.sentinel.dashboard.controller;
  20. import com.alibaba.csp.sentinel.dashboard.discovery.AppManagement;
  21. import com.alibaba.csp.sentinel.util.StringUtil;
  22. import com.alibaba.csp.sentinel.dashboard.discovery.MachineDiscovery;
  23. import com.alibaba.csp.sentinel.dashboard.discovery.MachineInfo;
  24. import com.alibaba.csp.sentinel.dashboard.domain.Result;
  25. import org.slf4j.Logger;
  26. import org.slf4j.LoggerFactory;
  27. import org.springframework.beans.factory.annotation.Autowired;
  28. import org.springframework.http.MediaType;
  29. import org.springframework.stereotype.Controller;
  30. import org.springframework.web.bind.annotation.RequestMapping;
  31. import org.springframework.web.bind.annotation.RequestParam;
  32. import org.springframework.web.bind.annotation.ResponseBody;
  33. /*
  34. * 注册应用处理器
  35. * */
  36. @Controller
  37. @RequestMapping(value = "/registry", produces = MediaType.APPLICATION_JSON_VALUE)
  38. public class MachineRegistryController {
  39. private final Logger logger = LoggerFactory.getLogger(MachineRegistryController.class);
  40. @Autowired
  41. private AppManagement appManagement;
  42. /**
  43. * 注册应用信息
  44. * */
  45. @ResponseBody
  46. @RequestMapping("/machine")
  47. public Result<?> receiveHeartBeat(String app, @RequestParam(value = "app_type", required = false, defaultValue = "0") Integer appType, Long version, String v, String hostname, String ip, Integer port) {
  48. if (app == null) {
  49. app = MachineDiscovery.UNKNOWN_APP_NAME;
  50. }
  51. if (ip == null) {
  52. return Result.ofFail(-1, "ip can't be null");
  53. }
  54. if (port == null) {
  55. return Result.ofFail(-1, "port can't be null");
  56. }
  57. if (port == -1) {
  58. logger.info("Receive heartbeat from " + ip + " but port not set yet");
  59. return Result.ofFail(-1, "your port not set yet");
  60. }
  61. String sentinelVersion = StringUtil.isEmpty(v) ? "unknown" : v;
  62. version = version == null ? System.currentTimeMillis() : version;
  63. try {
  64. MachineInfo machineInfo = new MachineInfo();
  65. machineInfo.setApp(app);
  66. machineInfo.setAppType(appType);
  67. machineInfo.setHostname(hostname);
  68. machineInfo.setIp(ip);
  69. machineInfo.setPort(port);
  70. machineInfo.setHeartbeatVersion(version);
  71. machineInfo.setLastHeartbeat(System.currentTimeMillis());
  72. machineInfo.setVersion(sentinelVersion);
  73. // 将接受到的应用信息添加到应用程序管理appManagement
  74. appManagement.addMachine(machineInfo);
  75. return Result.ofSuccessMsg("success");
  76. } catch (Exception e) {
  77. logger.error("Receive heartbeat error", e);
  78. return Result.ofFail(-1, e.getMessage());
  79. }
  80. }
  81. }

2.2 客户端请求处理

在sentinel中,一些数据存储,限流规则等都是在客户端存储的,那么客户端是怎么处理sentinel发送过来的请求呢?

2.2.1客户端请求处理流程分析

Sentinel源码分析 - 图5

2.2.1 主要源码跟踪

  1. w.func.init()方法会执行每个InitFunc实现类的init()方法,其中有一个实现类CommandCenterInitFunc完成sentinel服务端发送过来的请求相关操作
    Sentinel源码分析 - 图6
    2. commandCenter.beforeStart()注册处理器,会将所有的处理器(定义在业务方法中指定熔断或降级的处理器类)进行注册,以key-value的形式存入handlerMap中
    Sentinel源码分析 - 图7
    3. commandCenter.start();启动命令中心 ```java @Override public void start() throws Exception {

    1. int nThreads = Runtime.getRuntime().availableProcessors();
    2. this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
    3. new ArrayBlockingQueue<Runnable>(10),
    4. new NamedThreadFactory("sentinel-command-center-service-executor"),
    5. new RejectedExecutionHandler() {
    6. @Override
    7. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    8. CommandCenterLog.info("EventTask rejected");
    9. throw new RejectedExecutionException();
    10. }
    11. });
    12. Runnable serverInitTask = new Runnable() {
    13. int port;
    14. {
    15. try {
    16. //从配置文件中获取端口,如果没有配置设置默认端口8719
    17. port = Integer.parseInt(TransportConfig.getPort());
    18. } catch (Exception e) {
    19. port = DEFAULT_PORT;
    20. }
    21. }
    22. @Override
    23. public void run() {
    24. boolean success = false;
    25. // 获取可用的端口用以创建一个ServerSocket 如果端口被占用 则重试三次 每次端口号+1
    26. ServerSocket serverSocket = getServerSocketFromBasePort(port);
    27. if (serverSocket != null) {
    28. CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
    29. socketReference = serverSocket;
    30. // 在主线程中启动ServerThread用以接收socket请求
    31. executor.submit(new ServerThread(serverSocket));
    32. success = true;
    33. port = serverSocket.getLocalPort();
    34. } else {
    35. CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work");
    36. }
    37. if (!success) {
    38. port = PORT_UNINITIALIZED;
    39. }
    40. TransportConfig.setRuntimePort(port);
    41. executor.shutdown();
    42. }
    43. };
    44. new Thread(serverInitTask).start();

    }

    /**

    • Get a server socket from an available port from a base port.
    • Increasing on port number will occur when the port has already been used. *
    • @param basePort base port to start
    • @return new socket with available port */ private static ServerSocket getServerSocketFromBasePort(int basePort) { int tryCount = 0; while (true) {
      1. try {
      2. //如果发现端口占用情况,则尝试3次,每次端口号加1
      3. ServerSocket server = new ServerSocket(basePort + tryCount / 3, 100);
      4. server.setReuseAddress(true);
      5. return server;
      6. } catch (IOException e) {
      7. tryCount++;
      8. try {
      9. TimeUnit.MILLISECONDS.sleep(30);
      10. } catch (InterruptedException e1) {
      11. break;
      12. }
      13. }
      } return null; }
  1. ServerThread: 客户端监听到来自服务端的连接 创建一个业务线程去处理
  2. ```java
  3. class ServerThread extends Thread {
  4. private ServerSocket serverSocket;
  5. ServerThread(ServerSocket s) {
  6. this.serverSocket = s;
  7. setName("sentinel-courier-server-accept-thread");
  8. }
  9. @Override
  10. public void run() {
  11. while (true) {
  12. Socket socket = null;
  13. try {
  14. //Socket监听
  15. socket = this.serverSocket.accept();
  16. setSocketSoTimeout(socket);
  17. // 将接收到的socket封装到HttpEventTask中由业务线程去处理 主线程不处理
  18. HttpEventTask eventTask = new HttpEventTask(socket);
  19. bizExecutor.submit(eventTask);
  20. } catch (Exception e) {
  21. CommandCenterLog.info("Server error", e);
  22. if (socket != null) {
  23. try {
  24. socket.close();
  25. } catch (Exception e1) {
  26. CommandCenterLog.info("Error when closing an opened socket", e1);
  27. }
  28. }
  29. try {
  30. // In case of infinite log.
  31. Thread.sleep(10);
  32. } catch (InterruptedException e1) {
  33. // Indicates the task should stop.
  34. break;
  35. }
  36. }
  37. }
  38. }
  39. }
  1. HttpEventTask类处理sentinel发送过来的请求信息
    Sentinel源码分析 - 图8

  2. commandHandler.handle(request)处理请求,例如sentinel发送过来的是/setRules,则调用ModifyRulesCommandHandler

    1. @CommandMapping(name = "setRules", desc = "modify the rules, accept param: type={ruleType}&data={ruleJson}")
    2. public class ModifyRulesCommandHandler implements CommandHandler<String> {
    3. @Override
    4. public CommandResponse<String> handle(CommandRequest request) {
    5. //获取规则类型
    6. String type = request.getParam("type");
    7. // rule data in get parameter
    8. //获取规则数据
    9. String data = request.getParam("data");
    10. if (StringUtil.isNotEmpty(data)) {
    11. try {
    12. data = URLDecoder.decode(data, "utf-8");
    13. } catch (Exception e) {
    14. RecordLog.info("Decode rule data error", e);
    15. return CommandResponse.ofFailure(e, "decode rule data error");
    16. }
    17. }
    18. RecordLog.info(String.format("Receiving rule change (type: %s): %s", type, data));
    19. String result = "success";
    20. //限流
    21. if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) {
    22. List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class);
    23. FlowRuleManager.loadRules(flowRules);
    24. if (!writeToDataSource(getFlowDataSource(), flowRules)) {
    25. result = WRITE_DS_FAILURE_MSG;
    26. }
    27. return CommandResponse.ofSuccess(result);
    28. } else if (AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)) {//授权
    29. List<AuthorityRule> rules = JSONArray.parseArray(data, AuthorityRule.class);
    30. AuthorityRuleManager.loadRules(rules);
    31. if (!writeToDataSource(getAuthorityDataSource(), rules)) {
    32. result = WRITE_DS_FAILURE_MSG;
    33. }
    34. return CommandResponse.ofSuccess(result);
    35. } else if (DEGRADE_RULE_TYPE.equalsIgnoreCase(type)) {//熔断
    36. List<DegradeRule> rules = JSONArray.parseArray(data, DegradeRule.class);
    37. DegradeRuleManager.loadRules(rules);
    38. if (!writeToDataSource(getDegradeDataSource(), rules)) {
    39. result = WRITE_DS_FAILURE_MSG;
    40. }
    41. return CommandResponse.ofSuccess(result);
    42. } else if (SYSTEM_RULE_TYPE.equalsIgnoreCase(type)) {//系统规则
    43. List<SystemRule> rules = JSONArray.parseArray(data, SystemRule.class);
    44. SystemRuleManager.loadRules(rules);
    45. if (!writeToDataSource(getSystemSource(), rules)) {
    46. result = WRITE_DS_FAILURE_MSG;
    47. }
    48. return CommandResponse.ofSuccess(result);
    49. }
    50. return CommandResponse.ofFailure(new IllegalArgumentException("invalid type"));
    51. }
    52. }

    6.假如type是限流则调用FlowRuleManager.loadRules(flowRules)去加载限流规则
    Sentinel源码分析 - 图9
    7. FlowPropertyListener类的configUpdate方法
    Sentinel源码分析 - 图10
    8. buildFlowRuleMap方法
    Sentinel源码分析 - 图11
    Sentinel源码分析 - 图12

  3. generateRater(rule)方法
    Sentinel源码分析 - 图13
    Sentinel源码分析 - 图14

2.3 sentinel限流分析

在刚才源码中我们知道客户端是如何处理sentinel发送过来的限流,那么我们看看客户端是如何完成限流操作的

2.3.1限流流程分析

Sentinel源码分析 - 图15

2.3.2主要源码跟踪
1. SphU.entry(target, EntryType.IN)代码完成限流/熔断等操作
Sentinel源码分析 - 图16
Sentinel源码分析 - 图17

2.经过调用最终会调用CtSph的entryWithPriority方法
Sentinel源码分析 - 图18
Sentinel源码分析 - 图19

  1. lookProcessChain方法构建链路
    Sentinel源码分析 - 图20
    SlotChainProvider.newSlotChain()
    Sentinel源码分析 - 图21
    builder.build() 依次在链路上加入不同的插槽
    Sentinel源码分析 - 图22
    Sentinel源码分析 - 图23
    4. chain.entry方法开启链路调用,会对链路中每个进行逐一调用,一直到到FlowSlot
    Sentinel源码分析 - 图24

  2. checkFlow限流规则检查
    Sentinel源码分析 - 图25
    ruleProvider.apply(resource.getName());
    Sentinel源码分析 - 图26

  3. canPassCheck()判断规则是否通过
    Sentinel源码分析 - 图27
    Sentinel源码分析 - 图28

  4. DefaultController默认拒接策略
    Sentinel源码分析 - 图29
    Sentinel源码分析 - 图30
    Sentinel源码分析 - 图31