一、获取配置

NacosConfigService

  1. private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
  2. group = this.blank2defaultGroup(group);
  3. ParamUtils.checkKeyParam(dataId, group);
  4. ConfigResponse cr = new ConfigResponse();
  5. cr.setDataId(dataId);
  6. cr.setTenant(tenant);
  7. cr.setGroup(group);
  8. String content = LocalConfigInfoProcessor.getFailover(this.agent.getName(), dataId, group, tenant);
  9. String encryptedDataKey;
  10. if (content != null) {
  11. LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", new Object[]{this.agent.getName(), dataId, group, tenant, ContentUtils.truncateContent(content)});
  12. cr.setContent(content);
  13. encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(this.agent.getName(), dataId, group, tenant);
  14. cr.setEncryptedDataKey(encryptedDataKey);
  15. this.configFilterChainManager.doFilter((IConfigRequest)null, cr);
  16. content = cr.getContent();
  17. return content;
  18. } else {
  19. try {
  20. ConfigResponse response = this.worker.getServerConfig(dataId, group, tenant, timeoutMs);
  21. cr.setContent(response.getContent());
  22. cr.setEncryptedDataKey(response.getEncryptedDataKey());
  23. this.configFilterChainManager.doFilter((IConfigRequest)null, cr);
  24. content = cr.getContent();
  25. return content;
  26. } catch (NacosException var9) {
  27. if (403 == var9.getErrCode()) {
  28. throw var9;
  29. } else {
  30. LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}", new Object[]{this.agent.getName(), dataId, group, tenant, var9.toString()});
  31. LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", new Object[]{this.agent.getName(), dataId, group, tenant, ContentUtils.truncateContent(content)});
  32. content = LocalConfigInfoProcessor.getSnapshot(this.agent.getName(), dataId, group, tenant);
  33. cr.setContent(content);
  34. encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(this.agent.getName(), dataId, group, tenant);
  35. cr.setEncryptedDataKey(encryptedDataKey);
  36. this.configFilterChainManager.doFilter((IConfigRequest)null, cr);
  37. content = cr.getContent();
  38. return content;
  39. }
  40. }
  41. }
  42. }

NacosConfigService构造器
NacosConfigService创建时在构造器中会创建CacheData

  1. public NacosConfigService(Properties properties) throws NacosException {
  2. ValidatorUtils.checkInitParam(properties);
  3. String encodeTmp = properties.getProperty("encode");
  4. if (StringUtils.isBlank(encodeTmp)) {
  5. this.encode = "UTF-8";
  6. } else {
  7. this.encode = encodeTmp.trim();
  8. }
  9. this.initNamespace(properties);
  10. this.configFilterChainManager = new ConfigFilterChainManager(properties);
  11. this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
  12. this.agent.start();
  13. this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
  14. }

ClientWorker
创建任务线程池

  1. public class ClientWorker implements Closeable {
  2. // 检测是否需要提交longPolling任务到executorService,如果需要则提交
  3. final ScheduledExecutorService executor;
  4. // 执行长轮询,一般情况下执行listener回调也是在这个线程里
  5. final ScheduledExecutorService executorService;
  6. // groupKey -> cacheData
  7. private final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap<String, CacheData>();
  8. // 类似httpClient
  9. private final HttpAgent agent;
  10. // 钩子管理器
  11. private final ConfigFilterChainManager configFilterChainManager;
  12. // nacos服务端是否健康
  13. private boolean isHealthServer = true;
  14. // 长轮询超时时间 默认30s
  15. private long timeout;
  16. // 当前长轮询任务数量
  17. private double currentLongingTaskCount = 0;
  18. // 长轮询发生异常,默认延迟2s进行下次长轮询
  19. private int taskPenaltyTime;
  20. // 是否在添加监听器时,主动获取最新配置
  21. private boolean enableRemoteSyncConfig = false;
  22. }
  1. public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
  2. final Properties properties) {
  3. this.agent = agent;
  4. this.configFilterChainManager = configFilterChainManager;
  5. // Initialize the timeout parameter
  6. // 初始化一些参数,如:timeout
  7. init(pr operties);
  8. // 初始化一些参数,如:timeout taskPenaltyTime 以及是否启用远程配置
  9. this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
  10. @Override
  11. public Thread newThread(Runnable r) {
  12. Thread t = new Thread(r);
  13. t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
  14. t.setDaemon(true);
  15. return t;
  16. }
  17. });
  18. // 执行LongPollingRunnable的执行器,固定线程数=核数
  19. this.executorService = Executors
  20. .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
  21. @Override
  22. public Thread newThread(Runnable r) {
  23. Thread t = new Thread(r);
  24. t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
  25. t.setDaemon(true);
  26. return t;
  27. }
  28. });
  29. // 检测并提交LongPollingRunnable到this.executorService
  30. this.executor.scheduleWithFixedDelay(new Runnable() {
  31. @Override
  32. public void run() {
  33. try {
  34. checkConfigInfo();
  35. } catch (Throwable e) {
  36. LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
  37. }
  38. }
  39. }, 1L, 10L, TimeUnit.MILLISECONDS);
  40. }

检查配置

  1. public void checkConfigInfo() {
  2. // 派遣任务
  3. int listenerSize = cacheMap.size();
  4. // 向上舍入 longingTaskCount.
  5. int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
  6. if (longingTaskCount > currentLongingTaskCount) {
  7. for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
  8. // The task list is no order.So it maybe has issues when changing.
  9. executorService.execute(new LongPollingRunnable(i));
  10. }
  11. currentLongingTaskCount = longingTaskCount;
  12. }
  13. }

二、注册监听

NacosContextRefresher实现了ApplicationListener当springboot容器初始化完成后会调用onApplicationEvent()方法

NacosContextRefresher

  1. public void onApplicationEvent(ApplicationReadyEvent event) {
  2. if (this.ready.compareAndSet(false, true)) {
  3. this.registerNacosListenersForApplications();
  4. }
  5. }
  1. private void registerNacosListenersForApplications() {
  2. if (this.isRefreshEnabled()) {
  3. Iterator var1 = NacosPropertySourceRepository.getAll().iterator();
  4. while(var1.hasNext()) {
  5. NacosPropertySource propertySource = (NacosPropertySource)var1.next();
  6. if (propertySource.isRefreshable()) {
  7. String dataId = propertySource.getDataId();
  8. this.registerNacosListener(propertySource.getGroup(), dataId);
  9. }
  10. }
  11. }
  12. }
  1. private void registerNacosListener(final String groupKey, final String dataKey) {
  2. String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
  3. Listener listener = (Listener)this.listenerMap.computeIfAbsent(key, (lst) -> {
  4. return new AbstractSharedListener() {
  5. public void innerReceive(String dataId, String group, String configInfo) {
  6. NacosContextRefresher.refreshCountIncrement();
  7. NacosContextRefresher.this.nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
  8. NacosContextRefresher.this.applicationContext.publishEvent(new RefreshEvent(this, (Object)null, "Refresh Nacos config"));
  9. if (NacosContextRefresher.log.isDebugEnabled()) {
  10. NacosContextRefresher.log.debug(String.format("Refresh Nacos config group=%s,dataId=%s,configInfo=%s", group, dataId, configInfo));
  11. }
  12. }
  13. };
  14. });
  15. try {
  16. this.configService.addListener(dataKey, groupKey, listener);
  17. } catch (NacosException var6) {
  18. log.warn(String.format("register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey, groupKey), var6);
  19. }
  20. }

NacosConfigService

  1. @Override
  2. public void addListener(String dataId, String group, Listener listener) throws NacosException {
  3. worker.addTenantListeners(dataId, group, Arrays.asList(listener));
  4. }

ClientWorker

  1. // ClientWorker
  2. public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)
  3. throws NacosException {
  4. group = null2defaultGroup(group);
  5. String tenant = agent.getTenant();
  6. // 获取CacheData
  7. CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
  8. // 给CacheData注册监听器
  9. for (Listener listener : listeners) {
  10. cache.addListener(listener);
  11. }
  12. }

ClientWorker

  1. public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
  2. String key = GroupKey.getKeyTenant(dataId, group, tenant);
  3. // 1 如果缓存中已经存在,直接返回
  4. CacheData cacheData = cacheMap.get(key);
  5. if (cacheData != null) {
  6. return cacheData;
  7. }
  8. // 2 创建CacheData,这里会使用本地配置文件设置为初始配置
  9. cacheData = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
  10. // 3 多线程操作cacheMap再次校验是否已经缓存了cacheData
  11. CacheData lastCacheData = cacheMap.putIfAbsent(key, cacheData);
  12. // 4 如果当前线程成功设置了key-cacheData,返回cacheData
  13. if (lastCacheData == null) {
  14. if (enableRemoteSyncConfig) { // 是否允许添加监听时实时同步配置,默认false
  15. String[] ct = getServerConfig(dataId, group, tenant, 3000L);
  16. cacheData.setContent(ct[0]);
  17. }
  18. // 计算所属长轮询任务id
  19. int taskId = cacheMap.size() / (int) ParamUtil.getPerTaskConfigSize();
  20. cacheData.setTaskId(taskId);
  21. lastCacheData = cacheData;
  22. }
  23. // 这里设置cacheData正在初始化,让下次长轮询立即返回结果
  24. lastCacheData.setInitializing(true);
  25. // 5 否则返回的cacheData是老的cacheData
  26. return lastCacheData;
  27. }

CacheData

  1. public CacheData(ConfigFilterChainManager configFilterChainManager, String name, String dataId, String group, String tenant) {
  2. if (null != dataId && null != group) {
  3. this.name = name;
  4. this.configFilterChainManager = configFilterChainManager;
  5. this.dataId = dataId;
  6. this.group = group;
  7. this.tenant = tenant;
  8. this.listeners = new CopyOnWriteArrayList();
  9. this.isInitializing = true;
  10. this.content = this.loadCacheContentFromDiskLocal(name, dataId, group, tenant);
  11. this.md5 = getMd5String(this.content);
  12. this.encryptedDataKey = this.loadEncryptedDataKeyFromDiskLocal(name, dataId, group, tenant);
  13. } else {
  14. throw new IllegalArgumentException("dataId=" + dataId + ", group=" + group);
  15. }
  16. }

CacheData

  1. // CacheData
  2. // 注册在这个tenant-group-dataId配置上的监听器
  3. private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
  4. public void addListener(Listener listener) {
  5. ManagerListenerWrap wrap =
  6. (listener instanceof AbstractConfigChangeListener) ? new ManagerListenerWrap(listener, md5, content)
  7. : new ManagerListenerWrap(listener, md5);
  8. if (listeners.addIfAbsent(wrap)) {
  9. LOGGER.info("[{}] [add-listener] ok, tenant={}, dataId={}, group={}, cnt={}", name, tenant, dataId, group,
  10. listeners.size());
  11. }
  12. }

三、长轮询流程

当监听器开启之后会cachemap中添加监听的配置,这时候在检查配置时候会开启长轮询任务
检查配置longingTaskCount > currentLongingTaskCount这个判断才能进入方法

  1. public void checkConfigInfo() {
  2. // 派遣任务
  3. int listenerSize = cacheMap.size();
  4. // 向上舍入 longingTaskCount.
  5. int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
  6. if (longingTaskCount > currentLongingTaskCount) {
  7. for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
  8. // The task list is no order.So it maybe has issues when changing.
  9. executorService.execute(new LongPollingRunnable(i));
  10. }
  11. currentLongingTaskCount = longingTaskCount;
  12. }
  13. }

长轮询任务分为几个步骤:

  • 处理failover配置:判断当前CacheData是否使用failover配置(ClientWorker.checkLocalConfig),如果使用failover配置,则校验本地配置文件内容是否发生变化,发生变化则触发监听器(CacheData.checkListenerMd5)。这一步其实和长轮询无关。
  • 对于所有非failover配置,执行长轮询,返回发生改变的groupKey(ClientWorker.checkUpdateDataIds)。
  • 根据返回的groupKey,查询服务端实时配置并保存snapshot(ClientWorker.getServerConfig)
  • 更新内存CacheData的配置content。
  • 校验配置是否发生变更,通知监听器(CacheData.checkListenerMd5)。
  • 如果正常执行本次长轮询,立即提交长轮询任务,执行下一次长轮询;发生异常,延迟2s提交长轮询任务。
  1. class LongPollingRunnable implements Runnable {
  2. private final int taskId;
  3. public LongPollingRunnable(int taskId) {
  4. this.taskId = taskId;
  5. }
  6. @Override
  7. public void run() {
  8. // 当前长轮询任务负责的CacheData集合
  9. List<CacheData> cacheDatas = new ArrayList<CacheData>();
  10. // 正在初始化的CacheData 即刚构建的CacheData,内部的content仍然是snapshot版本
  11. List<String> inInitializingCacheList = new ArrayList<String>();
  12. try {
  13. // 1. 对于failover配置文件的处理
  14. for (CacheData cacheData : cacheMap.values()) {
  15. if (cacheData.getTaskId() == taskId) {
  16. cacheDatas.add(cacheData);
  17. try {
  18. // 判断cacheData是否需要使用failover配置,设置isUseLocalConfigInfo
  19. // 如果需要则更新内存中的配置
  20. checkLocalConfig(cacheData);
  21. // 使用failover配置则检测content内容是否发生变化,如果变化则通知监听器
  22. if (cacheData.isUseLocalConfigInfo()) {
  23. cacheData.checkListenerMd5();
  24. }
  25. } catch (Exception e) {
  26. LOGGER.error("get local config info error", e);
  27. }
  28. }
  29. }
  30. // 2. 对于所有非failover配置,执行长轮询,返回发生改变的groupKey
  31. List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
  32. for (String groupKey : changedGroupKeys) {
  33. String[] key = GroupKey.parseKey(groupKey);
  34. String dataId = key[0];
  35. String group = key[1];
  36. String tenant = null;
  37. if (key.length == 3) {
  38. tenant = key[2];
  39. }
  40. try {
  41. // 3. 对于发生改变的配置,查询实时配置并保存snapshot
  42. String[] ct = getServerConfig(dataId, group, tenant, 3000L);
  43. // 4. 更新内存中的配置
  44. CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
  45. cache.setContent(ct[0]);
  46. if (null != ct[1]) {
  47. cache.setType(ct[1]);
  48. }
  49. } catch (NacosException ioe) {
  50. LOGGER.error(message, ioe);
  51. }
  52. }
  53. // 5. 对于非failover配置,触发监听器
  54. for (CacheData cacheData : cacheDatas) {
  55. // 排除failover文件
  56. if (!cacheData.isInitializing() || inInitializingCacheList
  57. .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
  58. // 校验md5是否发生变化,如果发生变化通知listener
  59. cacheData.checkListenerMd5();
  60. cacheData.setInitializing(false);
  61. }
  62. }
  63. inInitializingCacheList.clear();
  64. // 6-1. 都执行完成以后,再次提交长轮询任务
  65. executorService.execute(this);
  66. } catch (Throwable e) {
  67. // 6-2. 如果长轮询执行发生异常,延迟2s执行下一次长轮询
  68. executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
  69. }
  70. }
  71. }

checkLocalConfig

  1. private void checkLocalConfig(CacheData cacheData) {
  2. final String dataId = cacheData.dataId;
  3. final String group = cacheData.group;
  4. final String tenant = cacheData.tenant;
  5. File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);
  6. // 当isUseLocalConfigInfo=false 且 failover配置文件存在时,使用failover配置文件,并更新内存中的配置
  7. if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
  8. String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
  9. final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
  10. cacheData.setUseLocalConfigInfo(true);
  11. cacheData.setLocalConfigInfoVersion(path.lastModified());
  12. cacheData.setContent(content);
  13. return;
  14. }
  15. // 当isUseLocalConfigInfo=true 且 failover配置文件不存在时,不使用failover配置文件
  16. if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
  17. cacheData.setUseLocalConfigInfo(false);
  18. return;
  19. }
  20. // 当isUseLocalConfigInfo=true 且 failover配置文件存在时,使用failover配置文件并更新内存中的配置
  21. if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path
  22. .lastModified()) {
  23. String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
  24. final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
  25. cacheData.setUseLocalConfigInfo(true);
  26. cacheData.setLocalConfigInfoVersion(path.lastModified());
  27. cacheData.setContent(content);
  28. }
  29. }

什么情况下会使用failover本地配置
长轮询任务不仅仅向服务端发起请求获取配置发生变更的groupKey,而且执行了failover本地配置的监听。
ClientWorker.checkLocalConfig判断了当前CacheData是否需要使用failover本地配置,这类配置不会从服务端获取,只能在文件系统中手动更新。

当文件系统指定路径下的failover配置文件存在时,就会优先使用failover配置文件;当failover配置文件被删除时,又会切换为使用server端配置。同时,如果使用failover配置文件,此处会更新CacheData中的配置。

  1. private void checkLocalConfig(CacheData cacheData) {
  2. final String dataId = cacheData.dataId;
  3. final String group = cacheData.group;
  4. final String tenant = cacheData.tenant;
  5. File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);
  6. // 当isUseLocalConfigInfo=false 且 failover配置文件存在时,使用failover配置文件,并更新内存中的配置
  7. if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
  8. String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
  9. final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
  10. cacheData.setUseLocalConfigInfo(true);
  11. cacheData.setLocalConfigInfoVersion(path.lastModified());
  12. cacheData.setContent(content);
  13. return;
  14. }
  15. // 当isUseLocalConfigInfo=true 且 failover配置文件不存在时,不使用failover配置文件
  16. if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
  17. cacheData.setUseLocalConfigInfo(false);
  18. return;
  19. }
  20. // 当isUseLocalConfigInfo=true 且 failover配置文件存在时,使用failover配置文件并更新内存中的配置
  21. if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path
  22. .lastModified()) {
  23. String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
  24. final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
  25. cacheData.setUseLocalConfigInfo(true);
  26. cacheData.setLocalConfigInfoVersion(path.lastModified());
  27. cacheData.setContent(content);
  28. }
  29. }

向Server端发起长轮询请求
针对所有非failover配置,通过ClientWorker.checkUpdateDataIds发起长轮询请求。
这里会统计所有非failover配置,并拼接请求业务报文:

  • 有namespace的CacheData:dataId group md5 namespace
  • 无namespace的CacheData:dataId group md5

此外,这里过滤出了正在初始化的CacheData,即CacheData刚构建,内部content仍然是本地snapshot版本,这部分配置将会有特殊处理。

  1. List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws Exception {
  2. StringBuilder sb = new StringBuilder();
  3. // 统计所有非failover的cacheData,拼接为"dataId group md5"或"dataId group md5 namespace"
  4. for (CacheData cacheData : cacheDatas) {
  5. if (!cacheData.isUseLocalConfigInfo()) {
  6. sb.append(cacheData.dataId).append(WORD_SEPARATOR);
  7. sb.append(cacheData.group).append(WORD_SEPARATOR);
  8. if (StringUtils.isBlank(cacheData.tenant)) {
  9. sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
  10. } else {
  11. sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
  12. sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
  13. }
  14. // 将首次监听的CacheData放入inInitializingCacheList
  15. if (cacheData.isInitializing()) {
  16. inInitializingCacheList
  17. .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
  18. }
  19. }
  20. }
  21. boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
  22. // 实际发起请求
  23. return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
  24. }

checkUpdateConfigStr方法负责调用服务端/v1/cs/configs/listener长轮询接口,并解析报文返回。关注几个点:

  • 请求参数Listening-Configs是上面拼接的业务报文
  • 长轮询超时时间默认30s,放在请求头Long-Pulling-Timeout里
  • 如果本次长轮询包含首次监听的配置项,在请求头设置Long-Pulling-Timeout-No-Hangup=true,让服务端立即返回本次轮询结果
  • 服务端/v1/cs/configs/listener接口负责处理长轮询请求
  • parseUpdateDataIdResponse方法会解析服务端返回报文
  1. // ClientWorker
  2. List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {
  3. Map<String, String> params = new HashMap<String, String>(2);
  4. // 拼接的业务报文 key = Listening-Configs
  5. params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
  6. Map<String, String> headers = new HashMap<String, String>(2);
  7. // 长轮询超时时间30s
  8. headers.put("Long-Pulling-Timeout", "" + timeout);
  9. // 告诉服务端,本次长轮询包含首次监听的配置项,不要hold住请求,立即返回
  10. if (isInitializingCacheList) {
  11. headers.put("Long-Pulling-Timeout-No-Hangup", "true");
  12. }
  13. // 如果没有需要监听的
  14. if (StringUtils.isBlank(probeUpdateString)) {
  15. return Collections.emptyList();
  16. }
  17. try {
  18. // readTimeout = 45s
  19. long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
  20. // 请求/v1/cs/configs/listener
  21. HttpRestResult<String> result = agent
  22. .httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(),
  23. readTimeoutMs);
  24. if (result.ok()) {
  25. setHealthServer(true);
  26. // 解析返回报文
  27. return parseUpdateDataIdResponse(result.getData());
  28. } else {
  29. setHealthServer(false);
  30. }
  31. } catch (Exception e) {
  32. setHealthServer(false);
  33. throw e;
  34. }
  35. return Collections.emptyList();
  36. }

parseUpdateDataIdResponse解析服务端返回报文,每行报文代表一个发生配置变化的groupKey。

  1. private List<String> parseUpdateDataIdResponse(String response) {
  2. if (StringUtils.isBlank(response)) {
  3. return Collections.emptyList();
  4. }
  5. response = URLDecoder.decode(response, "UTF-8");
  6. List<String> updateList = new LinkedList<String>();
  7. // 按行分割
  8. for (String dataIdAndGroup : response.split(LINE_SEPARATOR)) {
  9. if (!StringUtils.isBlank(dataIdAndGroup)) {
  10. // 每行按空格分割,拼接为dataId+group+namespace 或 dataId+group
  11. String[] keyArr = dataIdAndGroup.split(WORD_SEPARATOR);
  12. String dataId = keyArr[0];
  13. String group = keyArr[1];
  14. if (keyArr.length == 2) {
  15. updateList.add(GroupKey.getKey(dataId, group));
  16. } else if (keyArr.length == 3) {
  17. String tenant = keyArr[2];
  18. updateList.add(GroupKey.getKeyTenant(dataId, group, tenant));
  19. } else {
  20. LOGGER.error();
  21. }
  22. }
  23. }
  24. return updateList;
  25. }

校验md5变化并触发监听器

收到服务端返回发生变化的配置项后,客户端会通过/v1/cs/configs接口获取对应的配置,并将配置保存到本地文件系统作为snapshot,这部分在配置查询部分看过,都是调用ClientWorker#getServerConfig方法。最后会将配置更新到CacheData的content字段中。
上述步骤处理完成后,通过CacheData.checkListenerMd5校验配置是否发生变更,并触发监听器。

  1. // CacheData.java
  2. // 注册在这个CacheData配置上的监听器
  3. private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
  4. // 配置的md5
  5. private volatile String md5;
  6. void checkListenerMd5() {
  7. for (ManagerListenerWrap wrap : listeners) {
  8. // 比较CacheData中的md5与Listener中上次的md5是否相同
  9. if (!md5.equals(wrap.lastCallMd5)) {
  10. // 不相同则触发监听器
  11. safeNotifyListener(dataId, group, content, type, md5, wrap);
  12. }
  13. }
  14. }

safeNotifyListener方法是通知监听器的主逻辑,如果Listener配置了自己的Executor,将在自己配置的线程服务里执行监听逻辑,默认使用长轮询线程执行监听逻辑。

  1. // CacheData.java
  2. private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
  3. final String md5, final ManagerListenerWrap listenerWrap) {
  4. final Listener listener = listenerWrap.listener;
  5. Runnable job = new Runnable() {
  6. @Override
  7. public void run() {
  8. try {
  9. // 如果是AbstractSharedListener,把dataId和group放到它的成员变量里
  10. if (listener instanceof AbstractSharedListener) {
  11. AbstractSharedListener adapter = (AbstractSharedListener) listener;
  12. adapter.fillContext(dataId, group);
  13. }
  14. ConfigResponse cr = new ConfigResponse();
  15. cr.setDataId(dataId);
  16. cr.setGroup(group);
  17. cr.setContent(content);
  18. // 给用户的钩子,忽略
  19. configFilterChainManager.doFilter(null, cr);
  20. String contentTmp = cr.getContent();
  21. // 触发监听器的receiveConfigInfo方法
  22. listener.receiveConfigInfo(contentTmp);
  23. // 如果是AbstractConfigChangeListener实例,触发receiveConfigChange方法
  24. if (listener instanceof AbstractConfigChangeListener) {
  25. Map data = ConfigChangeHandler.getInstance()
  26. .parseChangeData(listenerWrap.lastContent, content, type);
  27. ConfigChangeEvent event = new ConfigChangeEvent(data);
  28. ((AbstractConfigChangeListener) listener).receiveConfigChange(event);
  29. listenerWrap.lastContent = content;
  30. }
  31. // 更新监听器的上次md5值
  32. listenerWrap.lastCallMd5 = md5;
  33. } catch (NacosException ex) {
  34. LOGGER.error();
  35. } catch (Throwable t) {
  36. LOGGER.error();
  37. } finally {
  38. Thread.currentThread().setContextClassLoader(myClassLoader);
  39. }
  40. }
  41. };
  42. try {
  43. // 如果监听器配置了executor,使用配置的executor执行上面的任务
  44. if (null != listener.getExecutor()) {
  45. listener.getExecutor().execute(job);
  46. } else {
  47. // 否则直接执行,也就是在长轮询线程中执行
  48. job.run();
  49. }
  50. } catch (Throwable t) {
  51. LOGGER.error();
  52. }
  53. }

这里根据Listener的类型会触发不同的监听方法。如果是普通的Listener,会触发receiveConfigInfo方法,得到一个String,是变更后的配置值。

  1. public interface Listener {
  2. Executor getExecutor();
  3. void receiveConfigInfo(final String configInfo);
  4. }

如果是AbstractConfigChangeListener监听器,会触发receiveConfigChange方法,得到一个ConfigChangeEvent。

  1. public abstract class AbstractConfigChangeListener extends AbstractListener {
  2. public abstract void receiveConfigChange(final ConfigChangeEvent event);
  3. // 注意这里receiveConfigInfo是个空实现
  4. @Override
  5. public void receiveConfigInfo(final String configInfo) {
  6. }
  7. }

但是AbstractConfigChangeListener监听是有前提条件的,配置文件必须是yaml格式或properties格式,否则将不会触发Listener逻辑!见ConfigChangeHandler的parseChangeData方法,如果找不到解析器,会返回一个空的map。

  1. public Map parseChangeData(String oldContent, String newContent, String type) throws IOException {
  2. for (ConfigChangeParser changeParser : this.parserList) {
  3. // 判断是否有可以解析这种配置文件类型,目前仅支持properties和yaml
  4. if (changeParser.isResponsibleFor(type)) {
  5. return changeParser.doParse(oldContent, newContent, type);
  6. }
  7. }
  8. return Collections.emptyMap();
  9. }

safeNotifyListener这部分逻辑中构造的ConfigChangeEvent将不会包含任何数据。

  1. if (listener instanceof AbstractConfigChangeListener) {
  2. Map data = ConfigChangeHandler.getInstance()
  3. .parseChangeData(listenerWrap.lastContent, content, type);
  4. // 如果map为空,这里构造的event里的数据就也为空了,监听器感知不到配置变更
  5. ConfigChangeEvent event = new ConfigChangeEvent(data);
  6. ((AbstractConfigChangeListener) listener).receiveConfigChange(event);
  7. listenerWrap.lastContent = content;
  8. }
  1. public class ConfigChangeEvent {
  2. private final Map<String, ConfigChangeItem> data;
  3. public ConfigChangeEvent(Map<String, ConfigChangeItem> data) {
  4. this.data = data;
  5. }
  6. public ConfigChangeItem getChangeItem(String key) {
  7. return data.get(key);
  8. }
  9. public Collection<ConfigChangeItem> getChangeItems() {
  10. return data.values();
  11. }
  12. }