1、Nacos源码构建

1.1 环境准备


在nacos的官⽹介绍中,nacos源码运⾏,需要的java运⾏环境有:
JDK 1.8+
Maven 3.2+

从github上,下载nacos的源码到本地: https://github.com/alibaba/nacos

导⼊idea⼯程
1. 导⼊
nacos源码分析 - 图1
2. 配置maven环境,下载jar包如果是阿⾥云⼤约在5分钟左右
nacos源码分析 - 图2
3. 编译⼯程
nacos源码分析 - 图3

1.2 源码运⾏

  1. ⼯程启动

进⼊到nacos-console模块下,启动该模块下的com.alibaba.nacos.Nacos类。
nacos源码分析 - 图4
但通常情况下,会报如下错误:
nacos源码分析 - 图5
这是由于nacos默认使⽤的是集群⽅式,启动时会到默认的配置路径下,寻找集群配置⽂件

cluster.conf。

我们源码运⾏时,通常使⽤的是单机模式,因此需要在启动参数中进⾏设置,在jvm的启动参数
中,添加

  1. -Dnacos.standalone=true

nacos源码分析 - 图6

  1. 配置数据库
    修改console模块中的配置⽂件application.properties⽂件
    1. #关闭认证缓存
    2. nacos.core.auth.caching.enabled=false
    3. #*************** Config Module Related Configurations ***************#
    4. ### If user MySQL as datasource:
    5. spring.datasource.platform=mysql
    6. ### Count of DB:
    7. db.num=1
    8. ### Connect URL of DB:
    9. db.url.0=jdbc:mysql://127.0.0.1:3306/nacos?characterEncoding=utf8
    10. db.user=root
    11. db.password=root
    创建nacos数据库,并执⾏distribution模块中的SQL脚本
    nacos源码分析 - 图7

nacos项⽬结构
先来看下整个nacos项⽬结构
nacos源码分析 - 图8

  • address模块: 主要查询nacos集群中节点个数以及IP的列表.api模块: 主要给客户端调⽤的api接⼝的抽象.
  • common模块: 主要是通⽤的⼯具包和字符串常量的定义
  • client模块: 主要是对依赖api模块和common模块,对api的接⼝的实现,给nacos的客户端使⽤.
  • cmdb模块: 主要是操作的数据的存储在内存中,该模块提供⼀个查询数据标签的接⼝.
  • config模块: 主要是服务配置的管理,即配置中⼼, 提供api给客户端拉去配置信息,以及提供更新配置的,客户端通过⻓轮询的更新配置信息.数据存储是mysql.
  • naming模块: 主要是作为服务注册中⼼的实现模块,具备服务的注册和服务发现的功能.
  • console模块: 主要是实现控制台的功能.具有权限校验、服务状态、健康检查等功能.
  • core模块: 主要是实现Spring的PropertySource的后置处理器,⽤于属性加载,初始化,监听器相关操作
  • distribution模块: 主要是打包nacos-server的操作,使⽤maven-assembly-plugin进⾏⾃定义打包,

下⾯就是各个模块的依赖关系:
nacos源码分析 - 图9

2. nacos服务注册发现源码

2.1 @EnableDiscoveryClient 注解
1. @EnableDiscoveryClient注解

nacos源码分析 - 图10
EnableDiscoveryClien引⽤了EnableDiscoveryClientImportSelector类

  1. EnableDiscoveryClientImportSelector类
    nacos源码分析 - 图11
    nacos源码分析 - 图12
    3. 开启⾃动服务注册后会加载spring-cloud-alibaba-nacos-discovery-2.1.0.RELEASE-sources.jar!\META-INF\spring.factories⽂件中的

DiscoveryAutoConfiguration配置类,开启nacos服务⾃动注册
nacos源码分析 - 图13

2.2 nacos服务注册

2.2.1 服务注册流程分析

nacos源码分析 - 图14

2.2.2 主要源码跟踪

客户端操作
1. NacosDiscoveryAutoConfiguration类
nacos源码分析 - 图15
nacos源码分析 - 图16

  1. NacosAutoServiceRegistration继承AbstractAutoServiceRegistration,AbstractAutoServiceRegistration实现了ApplicationListener监听,所以会执⾏onApplicationEvent⽅法

nacos源码分析 - 图17
nacos源码分析 - 图18

com.alibaba.cloud.nacos.registry.NacosAutoServiceRegistration#register
nacos源码分析 - 图19=>org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#register
nacos源码分析 - 图20
=>com.alibaba.cloud.nacos.registry.NacosServiceRegistry#register
nacos源码分析 - 图21

  1. NacosNamingService的registerInstance⽅法
    nacos源码分析 - 图22

先看addBeatInfo⽅法
nacos源码分析 - 图23
nacos源码分析 - 图24
nacos源码分析 - 图25

registerService⽅法
nacos源码分析 - 图26

服务端接收请求
4. InstanceController类

nacos源码分析 - 图27
nacos源码分析 - 图28

心跳检测方法:
nacos源码分析 - 图29
nacos源码分析 - 图30
nacos源码分析 - 图31

  1. ClientBeatCheckTask类客户端⼼跳检查线程

    1. @Override
    2. public void run() {
    3. try {
    4. if (!getDistroMapper().responsible(service.getName())) {
    5. return;
    6. }
    7. if (!getSwitchDomain().isHealthCheckEnabled()) {
    8. return;
    9. }
    10. //1. 获取实例
    11. List<Instance> instances = service.allIPs(true);
    12. // first set health status of instances:
    13. //2.检查客户端实例最后使⽤时间是否超时
    14. for (Instance instance : instances) {
    15. if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
    16. if (!instance.isMarked()) {
    17. if (instance.isHealthy()) {
    18. //3.如果超时15秒设置健康状态为false
    19. instance.setHealthy(false);
    20. Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
    21. instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(),
    22. UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
    23. getPushService().serviceChanged(service);
    24. SpringContext.getAppContext().publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
    25. }
    26. }
    27. }
    28. }
    29. if (!getGlobalConfig().isExpireInstance()) {
    30. return;
    31. }
    32. // then remove obsolete instances:
    33. for (Instance instance : instances) {
    34. if (instance.isMarked()) {
    35. continue;
    36. }
    37. // 4. 检查是否超过30秒
    38. if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
    39. // delete instance
    40. Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JSON.toJSONString(instance));
    41. // 5. 如果超过30秒则删除实例
    42. deleteIP(instance);
    43. }
    44. }
    45. } catch (Exception e) {
    46. Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
    47. }
    48. }
  2. ClientBeatProcessor客户端⼼跳处理类

    1. @Override
    2. public void run() {
    3. Service service = this.service;
    4. if (Loggers.EVT_LOG.isDebugEnabled()) {
    5. Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
    6. }
    7. String ip = rsInfo.getIp();
    8. String clusterName = rsInfo.getCluster();
    9. int port = rsInfo.getPort();
    10. Cluster cluster = service.getClusterMap().get(clusterName);
    11. //1.获取所有实例
    12. List<Instance> instances = cluster.allIPs(true);
    13. for (Instance instance : instances) {
    14. if (instance.getIp().equals(ip) && instance.getPort() == port) {
    15. if (Loggers.EVT_LOG.isDebugEnabled()) {
    16. Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
    17. }
    18. //2. 设置实例的最后使⽤时间
    19. instance.setLastBeat(System.currentTimeMillis());
    20. if (!instance.isMarked()) {
    21. if (!instance.isHealthy()) {
    22. instance.setHealthy(true);
    23. Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
    24. cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);
    25. getPushService().serviceChanged(service);
    26. }
    27. }
    28. }
    29. }
    30. }

    2.3 nacos服务发现

    2.3.1服务发现流程分析

    nacos源码分析 - 图32

    2.3.2主要源码跟踪

    客户端代码
    基于Ribbon的服务发现DynamicServerListLoadBalancer
    nacos源码分析 - 图33

nacos源码分析 - 图34
先看上图中第一步的调用方法
=>com.netflix.loadbalancer.DynamicServerListLoadBalancer#enableAndInitLearnNewServersFeature
nacos源码分析 - 图35
跟踪进入start方法,线程内执行updateAction对象的doUpdate方法
nacos源码分析 - 图36
从start方法可以看到updateAction对象: 它是一个接口的匿名实现类
nacos源码分析 - 图37
这里就是执行restOrInit方法中第二步的更新服务列表的方法,只不过不是立即执行而是延迟一秒。

第二步直接调用updateListOfServers()是立即更新服务列表
=> com.netflix.loadbalancer.DynamicServerListLoadBalancer#updateListOfServers
serverListImpl.getUpdatedListOfServers()⽅法
nacos源码分析 - 图38
跟踪进入=>com.alibaba.cloud.nacos.ribbon.NacosServerList#getServers
nacos源码分析 - 图39
调用NacosNamingService 获取服务列表=> 调⽤hostReactor的getServiceInfo⽅法
nacos源码分析 - 图40
继续跟踪进入getServiceInfo
nacos源码分析 - 图41
nacos源码分析 - 图42

=>com.alibaba.nacos.client.naming.core.HostReactor#updateServiceNow
nacos源码分析 - 图43
继续跟踪发现它也是通过Http请求get方式来获取服务列表
nacos源码分析 - 图44
继续跟踪可以发现http请求返回的是一个json
nacos源码分析 - 图45
下一步是将json进行解析获取到服务信息,然后将其存入到本地的ConcurrentHashMap中
nacos源码分析 - 图46

Nacos服务端代码
构造服务信息Json过程:
nacos源码分析 - 图47
nacos源码分析 - 图48
nacos源码分析 - 图49
nacos源码分析 - 图50

至此服务发现流程结束

3. nacos配置中⼼源码

3.1加载配置中⼼源码分析

nacos源码分析 - 图51
Nacos配置数据,在单机模式下如果设置了存储数据源为MySQL,则所以配置信息都会存储两份 一个是本地配置文件,一份是存储到MySQL数据库,响应的时候从本地配置文件中直接加载返回

3.2加载配置中⼼主要源码跟踪

  1. springboot项⽬⾃动启动会加载spring-cloud-alibaba-nacos-config下⾯的spring.factories
    nacos源码分析 - 图52

  2. NacosConfigBootstrapConfiguration类中会声明2个Bean ```java /*

    • Copyright (C) 2018 the original author or authors. *
    • Licensed under the Apache License, Version 2.0 (the “License”);
    • you may not use this file except in compliance with the License.
    • You may obtain a copy of the License at *
    • http://www.apache.org/licenses/LICENSE-2.0 *
    • Unless required by applicable law or agreed to in writing, software
    • distributed under the License is distributed on an “AS IS” BASIS,
    • WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    • See the License for the specific language governing permissions and
    • limitations under the License. */

package com.alibaba.cloud.nacos;

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;

import com.alibaba.cloud.nacos.client.NacosPropertySourceLocator;

/**

  • @author xiaojing */ @Configuration @ConditionalOnProperty(name = “spring.cloud.nacos.config.enabled”, matchIfMissing = true) public class NacosConfigBootstrapConfiguration {

    /**

    • nacos配置属性对象声明 / @Bean @ConditionalOnMissingBean public NacosConfigProperties nacosConfigProperties() { return new NacosConfigProperties(); }

      //nacos属性源定位器声明 @Bean public NacosPropertySourceLocator nacosPropertySourceLocator(

      1. NacosConfigProperties nacosConfigProperties) {

      return new NacosPropertySourceLocator(nacosConfigProperties); }

}

  1. ![](https://cdn.nlark.com/yuque/0/2021/png/12497888/1618649059530-007b782f-17d3-40b5-8959-9848bdaf5874.png#clientId=u19cd54d9-8397-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=418&id=u307a9328&margin=%5Bobject%20Object%5D&originHeight=836&originWidth=1528&originalType=binary&ratio=1&rotation=0&showTitle=false&size=190965&status=done&style=none&taskId=u6ad901f9-dc70-4096-8efa-26c32d1fd8a&title=&width=764)<br />3.在NacosPropertySourceLocator类中有个locate⽅法在spring boot启动时会最终调⽤这个⽅法.
  2. ```java
  3. @Override
  4. public PropertySource<?> locate(Environment env) {
  5. ConfigService configService = nacosConfigProperties.configServiceInstance();
  6. if (null == configService) {
  7. log.warn("no instance of config service found, can't load config from nacos");
  8. return null;
  9. }
  10. long timeout = nacosConfigProperties.getTimeout();
  11. nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService,
  12. timeout);
  13. String name = nacosConfigProperties.getName();
  14. String dataIdPrefix = nacosConfigProperties.getPrefix();
  15. if (StringUtils.isEmpty(dataIdPrefix)) {
  16. dataIdPrefix = name;
  17. }
  18. if (StringUtils.isEmpty(dataIdPrefix)) {
  19. //获取应用名
  20. dataIdPrefix = env.getProperty("spring.application.name");
  21. }
  22. CompositePropertySource composite = new CompositePropertySource(
  23. NACOS_PROPERTY_SOURCE_NAME);
  24. //加载共享配置信息
  25. loadSharedConfiguration(composite);
  26. //加载扩展配置信息
  27. loadExtConfiguration(composite);
  28. //加载应用配置信息
  29. loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);
  30. return composite;
  31. }

4.主要看加载应⽤配置loadApplicationConfiguration这个⽅法
nacos源码分析 - 图53
5. loadNacosDataIfPresent⽅法
nacos源码分析 - 图54
6. nacosPropertySourceBuilder.build⽅法
nacos源码分析 - 图55
7. configService.getConfig经过层层调⽤会调⽤getConfigInner(…)⽅法
//调⽤ClientWorker的getServerConfig⽅法获取内容
nacos源码分析 - 图56
8. worker.getServerConfig⽅法
//默认访问路径:http://ip:port/nacos/v1/ns/configs
nacos源码分析 - 图57

9.发送http的get请求后会到达nacos的ConfigController的getConfig⽅法完成查询配置操作
nacos源码分析 - 图58

3.3客户端动态感知源码分析

上⾯分析了初始化的时候客户端如何加载配置,那么当服务端的配置信息变更的时候,客户端⼜是如何动态感知的呢?
nacos源码分析 - 图59
注意:
1、客户端长轮询任务在给Nacos发生请求的时候是一个长连接,超时时间是30秒,Nacos服务端设置的延时任务延时29.5秒,防止连接超时
2、延时任务监听配置是否有更改,有修改是通知上图的监听器,响应客户端最新数据

3.4客户端动态感知主要源码跟踪
1.在NacosPropertySourceLocator类中有个locate⽅法中会创建NacosConfigService对象
nacos源码分析 - 图60
2. NacosConfigService构造⽅法

nacos源码分析 - 图61
3. ClientWorker实例的初始化
nacos源码分析 - 图62
nacos源码分析 - 图63
4. checkConfigInfo(),该⽅法开始会检查配置
cacheMap就是从配置中心读取的配置的本地缓存
nacos源码分析 - 图64
5. LongPollingRunnable的run⽅法

  1. @Override
  2. public void run() {
  3. List<CacheData> cacheDatas = new ArrayList<CacheData>();
  4. List<String> inInitializingCacheList = new ArrayList<String>();
  5. try {
  6. // check failover config
  7. for (CacheData cacheData : cacheMap.get().values()) {
  8. if (cacheData.getTaskId() == taskId) {
  9. cacheDatas.add(cacheData);
  10. try {
  11. //检查本地配置
  12. checkLocalConfig(cacheData);
  13. if (cacheData.isUseLocalConfigInfo()) {
  14. cacheData.checkListenerMd5();
  15. }
  16. } catch (Exception e) {
  17. LOGGER.error("get local config info error", e);
  18. }
  19. }
  20. }
  21. //检查服务器配置 向Nacos服务器查询 获取更新的Key
  22. // 从服务端获取发⽣了变化的配置的key(chengedGroupKeys表示服务端告诉客户端,哪些配置发⽣了变化)
  23. List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
  24. // 遍历发⽣了变化的key,并根据key去服务端请求最新配置,并更新到内存缓存中
  25. for (String groupKey : changedGroupKeys) {
  26. String[] key = GroupKey.parseKey(groupKey);
  27. String dataId = key[0];
  28. String group = key[1];
  29. String tenant = null;
  30. if (key.length == 3) {
  31. tenant = key[2];
  32. }
  33. try {
  34. // 从远程服务端获取最新的配置,并缓存到内存中
  35. String content = getServerConfig(dataId, group, tenant, 3000L);
  36. CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
  37. cache.setContent(content);
  38. LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}",
  39. agent.getName(), dataId, group, tenant, cache.getMd5(),
  40. ContentUtils.truncateContent(content));
  41. } catch (NacosException ioe) {
  42. //.....省略
  43. }
  44. }
  45. for (CacheData cacheData : cacheDatas) {
  46. if (!cacheData.isInitializing() || inInitializingCacheList
  47. .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
  48. cacheData.checkListenerMd5();
  49. cacheData.setInitializing(false);
  50. }
  51. }
  52. inInitializingCacheList.clear();
  53. // 继续执⾏该任务
  54. executorService.execute(this);
  55. } catch (Throwable e) {
  56. //****省略
  57. }
  58. }
  59. }
  1. checkUpdateDataIds⽅法会调⽤checkUpdateConfigStr⽅法 ```java /**

    • 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。 */ List checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {

      List params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);

      List headers = new ArrayList(2); //设置超时时间30秒 headers.add(“Long-Pulling-Timeout”); headers.add(“” + timeout);

      // told server do not hang me up if new initializing cacheData added in //添加⻓轮询请求头 if (isInitializingCacheList) {

      1. headers.add("Long-Pulling-Timeout-No-Hangup");
      2. headers.add("true");

      }

      if (StringUtils.isBlank(probeUpdateString)) {

      1. return Collections.emptyList();

      }

      try {

      1. // 请求路径:http://ip:port/nacos/v1/ns/configs/listener
      2. HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
      3. agent.getEncode(), timeout);
      4. if (HttpURLConnection.HTTP_OK == result.code) {
      5. setHealthServer(true);
      6. return parseUpdateDataIdResponse(result.content);
      7. } else {

      };

      return Collections.emptyList(); }

  1. 7.发送httpget请求后会到达nacosConfigControllergetConfig⽅法完成查询配置操作<br />![](https://cdn.nlark.com/yuque/0/2021/png/12497888/1618652770209-5e6617e2-d9ad-4cf3-99ba-384abdc63aed.png#clientId=u19cd54d9-8397-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=504&id=ue24bd4ce&margin=%5Bobject%20Object%5D&originHeight=1008&originWidth=1484&originalType=binary&ratio=1&rotation=0&showTitle=false&size=261465&status=done&style=none&taskId=u4abaece6-f3b2-4a01-9ad7-c20600283c8&title=&width=742)<br />8. doPollingConfig⽅法
  2. ```java
  3. /**
  4. * 轮询接口
  5. */
  6. public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
  7. Map<String, String> clientMd5Map, int probeRequestSize)
  8. throws IOException {
  9. // 长轮询 根据请求头判断是否是长轮询
  10. if (LongPollingService.isSupportLongPolling(request)) {
  11. longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
  12. return HttpServletResponse.SC_OK + "";
  13. }
  14. // else 兼容短轮询逻辑
  15. List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
  16. // 兼容短轮询result
  17. String oldResult = MD5Util.compareMd5OldResult(changedGroups);
  18. String newResult = MD5Util.compareMd5ResultString(changedGroups);
  19. String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
  20. if (version == null) {
  21. version = "2.0.0";
  22. }
  23. int versionNum = Protocol.getVersionNumber(version);
  24. /**
  25. * 2.0.4版本以前, 返回值放入header中
  26. */
  27. if (versionNum < START_LONGPOLLING_VERSION_NUM) {
  28. response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
  29. response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
  30. } else {
  31. request.setAttribute("content", newResult);
  32. }
  33. Loggers.AUTH.info("new content:" + newResult);
  34. // 禁用缓存
  35. response.setHeader("Pragma", "no-cache");
  36. response.setDateHeader("Expires", 0);
  37. response.setHeader("Cache-Control", "no-cache,no-store");
  38. response.setStatus(HttpServletResponse.SC_OK);
  39. return HttpServletResponse.SC_OK + "";
  40. }
  1. addLongPollingClient⽅法

    1. public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
    2. int probeRequestSize) {
    3. String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
    4. String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
    5. String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
    6. String tag = req.getHeader("Vipserver-Tag");
    7. int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
    8. /**
    9. * 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动 add delay time for LoadBalance
    10. */
    11. long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
    12. if (isFixedPolling()) {
    13. timeout = Math.max(10000, getFixedPollingInterval());
    14. // do nothing but set fix polling timeout
    15. } else {
    16. long start = System.currentTimeMillis();
    17. //比较客户端MD5和Nacos服务端存的MD5是否有不一样
    18. List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
    19. if (changedGroups.size() > 0) {
    20. //有变更 直接返回
    21. generateResponse(req, rsp, changedGroups);
    22. LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
    23. System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",
    24. clientMd5Map.size(), probeRequestSize, changedGroups.size());
    25. return;
    26. } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
    27. LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
    28. RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
    29. changedGroups.size());
    30. return;
    31. }
    32. }
    33. String ip = RequestUtil.getRemoteIp(req);
    34. // 一定要由HTTP线程调用,否则离开后容器会立即发送响应
    35. final AsyncContext asyncContext = req.startAsync();
    36. // AsyncContext.setTimeout()的超时时间不准,所以只能自己控制
    37. asyncContext.setTimeout(0L);
    38. // 开启⼀个⻓轮询线程 由线程池去处理Request请求
    39. scheduler.execute(
    40. new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
    41. }
  2. ClientLongPolling的run⽅法

    1. @Override
    2. public void run() {
    3. // 调度⼀个延时执⾏的任务,在这段延时的时间内,会监听配置的修改
    4. asyncTimeoutFuture = scheduler.schedule(new Runnable() {
    5. @Override
    6. public void run() {
    7. try {
    8. getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
    9. /**
    10. * 删除订阅关系
    11. */
    12. allSubs.remove(ClientLongPolling.this);
    13. if (isFixedPolling()) {
    14. LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
    15. (System.currentTimeMillis() - createTime),
    16. "fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
    17. "polling",
    18. clientMd5Map.size(), probeRequestSize);
    19. List<String> changedGroups = MD5Util.compareMd5(
    20. (HttpServletRequest)asyncContext.getRequest(),
    21. (HttpServletResponse)asyncContext.getResponse(), clientMd5Map);
    22. if (changedGroups.size() > 0) {
    23. sendResponse(changedGroups);
    24. } else {
    25. sendResponse(null);
    26. }
    27. } else {
    28. LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
    29. (System.currentTimeMillis() - createTime),
    30. "timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
    31. "polling",
    32. clientMd5Map.size(), probeRequestSize);
    33. sendResponse(null);
    34. }
    35. } catch (Throwable t) {
    36. LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause());
    37. }
    38. }
    39. }, timeoutTime, TimeUnit.MILLISECONDS); //超时时间29.5秒
    40. allSubs.add(this);
    41. }
  3. onEvent⽅法
    nacos源码分析 - 图65
    12. DataChangeTask

    1. @Override
    2. public void run() {
    3. try {
    4. ConfigService.getContentBetaMd5(groupKey);
    5. for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
    6. ClientLongPolling clientSub = iter.next();
    7. if (clientSub.clientMd5Map.containsKey(groupKey)) {
    8. // 如果beta发布且不在beta列表直接跳过
    9. if (isBeta && !betaIps.contains(clientSub.ip)) {
    10. continue;
    11. }
    12. // 如果tag发布且不在tag列表直接跳过
    13. if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
    14. continue;
    15. }
    16. getRetainIps().put(clientSub.ip, System.currentTimeMillis());
    17. iter.remove(); // 删除订阅关系
    18. LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
    19. (System.currentTimeMillis() - changeTime),
    20. "in-advance",
    21. RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()),
    22. "polling",
    23. clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
    24. //响应客户端请求
    25. clientSub.sendResponse(Arrays.asList(groupKey));
    26. }
    27. }
    28. } catch (Throwable t) {
    29. LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause());
    30. }
    31. }