一、获取配置
NacosConfigService
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {group = this.blank2defaultGroup(group);ParamUtils.checkKeyParam(dataId, group);ConfigResponse cr = new ConfigResponse();cr.setDataId(dataId);cr.setTenant(tenant);cr.setGroup(group);String content = LocalConfigInfoProcessor.getFailover(this.agent.getName(), dataId, group, tenant);String encryptedDataKey;if (content != null) {LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", new Object[]{this.agent.getName(), dataId, group, tenant, ContentUtils.truncateContent(content)});cr.setContent(content);encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(this.agent.getName(), dataId, group, tenant);cr.setEncryptedDataKey(encryptedDataKey);this.configFilterChainManager.doFilter((IConfigRequest)null, cr);content = cr.getContent();return content;} else {try {ConfigResponse response = this.worker.getServerConfig(dataId, group, tenant, timeoutMs);cr.setContent(response.getContent());cr.setEncryptedDataKey(response.getEncryptedDataKey());this.configFilterChainManager.doFilter((IConfigRequest)null, cr);content = cr.getContent();return content;} catch (NacosException var9) {if (403 == var9.getErrCode()) {throw var9;} else {LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}", new Object[]{this.agent.getName(), dataId, group, tenant, var9.toString()});LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", new Object[]{this.agent.getName(), dataId, group, tenant, ContentUtils.truncateContent(content)});content = LocalConfigInfoProcessor.getSnapshot(this.agent.getName(), dataId, group, tenant);cr.setContent(content);encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(this.agent.getName(), dataId, group, tenant);cr.setEncryptedDataKey(encryptedDataKey);this.configFilterChainManager.doFilter((IConfigRequest)null, cr);content = cr.getContent();return content;}}}}
NacosConfigService构造器
NacosConfigService创建时在构造器中会创建CacheData
public NacosConfigService(Properties properties) throws NacosException {ValidatorUtils.checkInitParam(properties);String encodeTmp = properties.getProperty("encode");if (StringUtils.isBlank(encodeTmp)) {this.encode = "UTF-8";} else {this.encode = encodeTmp.trim();}this.initNamespace(properties);this.configFilterChainManager = new ConfigFilterChainManager(properties);this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));this.agent.start();this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);}
ClientWorker
创建任务线程池
public class ClientWorker implements Closeable {// 检测是否需要提交longPolling任务到executorService,如果需要则提交final ScheduledExecutorService executor;// 执行长轮询,一般情况下执行listener回调也是在这个线程里final ScheduledExecutorService executorService;// groupKey -> cacheDataprivate final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap<String, CacheData>();// 类似httpClientprivate final HttpAgent agent;// 钩子管理器private final ConfigFilterChainManager configFilterChainManager;// nacos服务端是否健康private boolean isHealthServer = true;// 长轮询超时时间 默认30sprivate long timeout;// 当前长轮询任务数量private double currentLongingTaskCount = 0;// 长轮询发生异常,默认延迟2s进行下次长轮询private int taskPenaltyTime;// 是否在添加监听器时,主动获取最新配置private boolean enableRemoteSyncConfig = false;}
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,final Properties properties) {this.agent = agent;this.configFilterChainManager = configFilterChainManager;// Initialize the timeout parameter// 初始化一些参数,如:timeoutinit(pr operties);// 初始化一些参数,如:timeout taskPenaltyTime 以及是否启用远程配置this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.Worker." + agent.getName());t.setDaemon(true);return t;}});// 执行LongPollingRunnable的执行器,固定线程数=核数this.executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());t.setDaemon(true);return t;}});// 检测并提交LongPollingRunnable到this.executorServicethis.executor.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {try {checkConfigInfo();} catch (Throwable e) {LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);}}}, 1L, 10L, TimeUnit.MILLISECONDS);}
检查配置
public void checkConfigInfo() {// 派遣任务int listenerSize = cacheMap.size();// 向上舍入 longingTaskCount.int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());if (longingTaskCount > currentLongingTaskCount) {for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {// The task list is no order.So it maybe has issues when changing.executorService.execute(new LongPollingRunnable(i));}currentLongingTaskCount = longingTaskCount;}}
二、注册监听
NacosContextRefresher实现了ApplicationListener当springboot容器初始化完成后会调用onApplicationEvent()方法
NacosContextRefresher
public void onApplicationEvent(ApplicationReadyEvent event) {if (this.ready.compareAndSet(false, true)) {this.registerNacosListenersForApplications();}}
private void registerNacosListenersForApplications() {if (this.isRefreshEnabled()) {Iterator var1 = NacosPropertySourceRepository.getAll().iterator();while(var1.hasNext()) {NacosPropertySource propertySource = (NacosPropertySource)var1.next();if (propertySource.isRefreshable()) {String dataId = propertySource.getDataId();this.registerNacosListener(propertySource.getGroup(), dataId);}}}}
private void registerNacosListener(final String groupKey, final String dataKey) {String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);Listener listener = (Listener)this.listenerMap.computeIfAbsent(key, (lst) -> {return new AbstractSharedListener() {public void innerReceive(String dataId, String group, String configInfo) {NacosContextRefresher.refreshCountIncrement();NacosContextRefresher.this.nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);NacosContextRefresher.this.applicationContext.publishEvent(new RefreshEvent(this, (Object)null, "Refresh Nacos config"));if (NacosContextRefresher.log.isDebugEnabled()) {NacosContextRefresher.log.debug(String.format("Refresh Nacos config group=%s,dataId=%s,configInfo=%s", group, dataId, configInfo));}}};});try {this.configService.addListener(dataKey, groupKey, listener);} catch (NacosException var6) {log.warn(String.format("register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey, groupKey), var6);}}
NacosConfigService
@Overridepublic void addListener(String dataId, String group, Listener listener) throws NacosException {worker.addTenantListeners(dataId, group, Arrays.asList(listener));}
ClientWorker
// ClientWorkerpublic void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)throws NacosException {group = null2defaultGroup(group);String tenant = agent.getTenant();// 获取CacheDataCacheData cache = addCacheDataIfAbsent(dataId, group, tenant);// 给CacheData注册监听器for (Listener listener : listeners) {cache.addListener(listener);}}
ClientWorker
public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {String key = GroupKey.getKeyTenant(dataId, group, tenant);// 1 如果缓存中已经存在,直接返回CacheData cacheData = cacheMap.get(key);if (cacheData != null) {return cacheData;}// 2 创建CacheData,这里会使用本地配置文件设置为初始配置cacheData = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);// 3 多线程操作cacheMap再次校验是否已经缓存了cacheDataCacheData lastCacheData = cacheMap.putIfAbsent(key, cacheData);// 4 如果当前线程成功设置了key-cacheData,返回cacheDataif (lastCacheData == null) {if (enableRemoteSyncConfig) { // 是否允许添加监听时实时同步配置,默认falseString[] ct = getServerConfig(dataId, group, tenant, 3000L);cacheData.setContent(ct[0]);}// 计算所属长轮询任务idint taskId = cacheMap.size() / (int) ParamUtil.getPerTaskConfigSize();cacheData.setTaskId(taskId);lastCacheData = cacheData;}// 这里设置cacheData正在初始化,让下次长轮询立即返回结果lastCacheData.setInitializing(true);// 5 否则返回的cacheData是老的cacheDatareturn lastCacheData;}
CacheData
public CacheData(ConfigFilterChainManager configFilterChainManager, String name, String dataId, String group, String tenant) {if (null != dataId && null != group) {this.name = name;this.configFilterChainManager = configFilterChainManager;this.dataId = dataId;this.group = group;this.tenant = tenant;this.listeners = new CopyOnWriteArrayList();this.isInitializing = true;this.content = this.loadCacheContentFromDiskLocal(name, dataId, group, tenant);this.md5 = getMd5String(this.content);this.encryptedDataKey = this.loadEncryptedDataKeyFromDiskLocal(name, dataId, group, tenant);} else {throw new IllegalArgumentException("dataId=" + dataId + ", group=" + group);}}
CacheData
// CacheData// 注册在这个tenant-group-dataId配置上的监听器private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;public void addListener(Listener listener) {ManagerListenerWrap wrap =(listener instanceof AbstractConfigChangeListener) ? new ManagerListenerWrap(listener, md5, content): new ManagerListenerWrap(listener, md5);if (listeners.addIfAbsent(wrap)) {LOGGER.info("[{}] [add-listener] ok, tenant={}, dataId={}, group={}, cnt={}", name, tenant, dataId, group,listeners.size());}}
三、长轮询流程
当监听器开启之后会cachemap中添加监听的配置,这时候在检查配置时候会开启长轮询任务
检查配置longingTaskCount > currentLongingTaskCount这个判断才能进入方法
public void checkConfigInfo() {// 派遣任务int listenerSize = cacheMap.size();// 向上舍入 longingTaskCount.int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());if (longingTaskCount > currentLongingTaskCount) {for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {// The task list is no order.So it maybe has issues when changing.executorService.execute(new LongPollingRunnable(i));}currentLongingTaskCount = longingTaskCount;}}
长轮询任务分为几个步骤:
- 处理failover配置:判断当前CacheData是否使用failover配置(ClientWorker.checkLocalConfig),如果使用failover配置,则校验本地配置文件内容是否发生变化,发生变化则触发监听器(CacheData.checkListenerMd5)。这一步其实和长轮询无关。
- 对于所有非failover配置,执行长轮询,返回发生改变的groupKey(ClientWorker.checkUpdateDataIds)。
- 根据返回的groupKey,查询服务端实时配置并保存snapshot(ClientWorker.getServerConfig)
- 更新内存CacheData的配置content。
- 校验配置是否发生变更,通知监听器(CacheData.checkListenerMd5)。
- 如果正常执行本次长轮询,立即提交长轮询任务,执行下一次长轮询;发生异常,延迟2s提交长轮询任务。
class LongPollingRunnable implements Runnable {private final int taskId;public LongPollingRunnable(int taskId) {this.taskId = taskId;}@Overridepublic void run() {// 当前长轮询任务负责的CacheData集合List<CacheData> cacheDatas = new ArrayList<CacheData>();// 正在初始化的CacheData 即刚构建的CacheData,内部的content仍然是snapshot版本List<String> inInitializingCacheList = new ArrayList<String>();try {// 1. 对于failover配置文件的处理for (CacheData cacheData : cacheMap.values()) {if (cacheData.getTaskId() == taskId) {cacheDatas.add(cacheData);try {// 判断cacheData是否需要使用failover配置,设置isUseLocalConfigInfo// 如果需要则更新内存中的配置checkLocalConfig(cacheData);// 使用failover配置则检测content内容是否发生变化,如果变化则通知监听器if (cacheData.isUseLocalConfigInfo()) {cacheData.checkListenerMd5();}} catch (Exception e) {LOGGER.error("get local config info error", e);}}}// 2. 对于所有非failover配置,执行长轮询,返回发生改变的groupKeyList<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);for (String groupKey : changedGroupKeys) {String[] key = GroupKey.parseKey(groupKey);String dataId = key[0];String group = key[1];String tenant = null;if (key.length == 3) {tenant = key[2];}try {// 3. 对于发生改变的配置,查询实时配置并保存snapshotString[] ct = getServerConfig(dataId, group, tenant, 3000L);// 4. 更新内存中的配置CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));cache.setContent(ct[0]);if (null != ct[1]) {cache.setType(ct[1]);}} catch (NacosException ioe) {LOGGER.error(message, ioe);}}// 5. 对于非failover配置,触发监听器for (CacheData cacheData : cacheDatas) {// 排除failover文件if (!cacheData.isInitializing() || inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {// 校验md5是否发生变化,如果发生变化通知listenercacheData.checkListenerMd5();cacheData.setInitializing(false);}}inInitializingCacheList.clear();// 6-1. 都执行完成以后,再次提交长轮询任务executorService.execute(this);} catch (Throwable e) {// 6-2. 如果长轮询执行发生异常,延迟2s执行下一次长轮询executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);}}}
checkLocalConfig
private void checkLocalConfig(CacheData cacheData) {final String dataId = cacheData.dataId;final String group = cacheData.group;final String tenant = cacheData.tenant;File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);// 当isUseLocalConfigInfo=false 且 failover配置文件存在时,使用failover配置文件,并更新内存中的配置if (!cacheData.isUseLocalConfigInfo() && path.exists()) {String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);cacheData.setUseLocalConfigInfo(true);cacheData.setLocalConfigInfoVersion(path.lastModified());cacheData.setContent(content);return;}// 当isUseLocalConfigInfo=true 且 failover配置文件不存在时,不使用failover配置文件if (cacheData.isUseLocalConfigInfo() && !path.exists()) {cacheData.setUseLocalConfigInfo(false);return;}// 当isUseLocalConfigInfo=true 且 failover配置文件存在时,使用failover配置文件并更新内存中的配置if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path.lastModified()) {String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);cacheData.setUseLocalConfigInfo(true);cacheData.setLocalConfigInfoVersion(path.lastModified());cacheData.setContent(content);}}
什么情况下会使用failover本地配置
长轮询任务不仅仅向服务端发起请求获取配置发生变更的groupKey,而且执行了failover本地配置的监听。
ClientWorker.checkLocalConfig判断了当前CacheData是否需要使用failover本地配置,这类配置不会从服务端获取,只能在文件系统中手动更新。
当文件系统指定路径下的failover配置文件存在时,就会优先使用failover配置文件;当failover配置文件被删除时,又会切换为使用server端配置。同时,如果使用failover配置文件,此处会更新CacheData中的配置。
private void checkLocalConfig(CacheData cacheData) {final String dataId = cacheData.dataId;final String group = cacheData.group;final String tenant = cacheData.tenant;File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);// 当isUseLocalConfigInfo=false 且 failover配置文件存在时,使用failover配置文件,并更新内存中的配置if (!cacheData.isUseLocalConfigInfo() && path.exists()) {String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);cacheData.setUseLocalConfigInfo(true);cacheData.setLocalConfigInfoVersion(path.lastModified());cacheData.setContent(content);return;}// 当isUseLocalConfigInfo=true 且 failover配置文件不存在时,不使用failover配置文件if (cacheData.isUseLocalConfigInfo() && !path.exists()) {cacheData.setUseLocalConfigInfo(false);return;}// 当isUseLocalConfigInfo=true 且 failover配置文件存在时,使用failover配置文件并更新内存中的配置if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path.lastModified()) {String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);cacheData.setUseLocalConfigInfo(true);cacheData.setLocalConfigInfoVersion(path.lastModified());cacheData.setContent(content);}}
向Server端发起长轮询请求
针对所有非failover配置,通过ClientWorker.checkUpdateDataIds发起长轮询请求。
这里会统计所有非failover配置,并拼接请求业务报文:
- 有namespace的CacheData:dataId group md5 namespace
- 无namespace的CacheData:dataId group md5
此外,这里过滤出了正在初始化的CacheData,即CacheData刚构建,内部content仍然是本地snapshot版本,这部分配置将会有特殊处理。
List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws Exception {StringBuilder sb = new StringBuilder();// 统计所有非failover的cacheData,拼接为"dataId group md5"或"dataId group md5 namespace"for (CacheData cacheData : cacheDatas) {if (!cacheData.isUseLocalConfigInfo()) {sb.append(cacheData.dataId).append(WORD_SEPARATOR);sb.append(cacheData.group).append(WORD_SEPARATOR);if (StringUtils.isBlank(cacheData.tenant)) {sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);} else {sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);}// 将首次监听的CacheData放入inInitializingCacheListif (cacheData.isInitializing()) {inInitializingCacheList.add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));}}}boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();// 实际发起请求return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);}
checkUpdateConfigStr方法负责调用服务端/v1/cs/configs/listener长轮询接口,并解析报文返回。关注几个点:
- 请求参数Listening-Configs是上面拼接的业务报文
- 长轮询超时时间默认30s,放在请求头Long-Pulling-Timeout里
- 如果本次长轮询包含首次监听的配置项,在请求头设置Long-Pulling-Timeout-No-Hangup=true,让服务端立即返回本次轮询结果
- 服务端/v1/cs/configs/listener接口负责处理长轮询请求
- parseUpdateDataIdResponse方法会解析服务端返回报文
// ClientWorkerList<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {Map<String, String> params = new HashMap<String, String>(2);// 拼接的业务报文 key = Listening-Configsparams.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);Map<String, String> headers = new HashMap<String, String>(2);// 长轮询超时时间30sheaders.put("Long-Pulling-Timeout", "" + timeout);// 告诉服务端,本次长轮询包含首次监听的配置项,不要hold住请求,立即返回if (isInitializingCacheList) {headers.put("Long-Pulling-Timeout-No-Hangup", "true");}// 如果没有需要监听的if (StringUtils.isBlank(probeUpdateString)) {return Collections.emptyList();}try {// readTimeout = 45slong readTimeoutMs = timeout + (long) Math.round(timeout >> 1);// 请求/v1/cs/configs/listenerHttpRestResult<String> result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(),readTimeoutMs);if (result.ok()) {setHealthServer(true);// 解析返回报文return parseUpdateDataIdResponse(result.getData());} else {setHealthServer(false);}} catch (Exception e) {setHealthServer(false);throw e;}return Collections.emptyList();}
parseUpdateDataIdResponse解析服务端返回报文,每行报文代表一个发生配置变化的groupKey。
private List<String> parseUpdateDataIdResponse(String response) {if (StringUtils.isBlank(response)) {return Collections.emptyList();}response = URLDecoder.decode(response, "UTF-8");List<String> updateList = new LinkedList<String>();// 按行分割for (String dataIdAndGroup : response.split(LINE_SEPARATOR)) {if (!StringUtils.isBlank(dataIdAndGroup)) {// 每行按空格分割,拼接为dataId+group+namespace 或 dataId+groupString[] keyArr = dataIdAndGroup.split(WORD_SEPARATOR);String dataId = keyArr[0];String group = keyArr[1];if (keyArr.length == 2) {updateList.add(GroupKey.getKey(dataId, group));} else if (keyArr.length == 3) {String tenant = keyArr[2];updateList.add(GroupKey.getKeyTenant(dataId, group, tenant));} else {LOGGER.error();}}}return updateList;}
校验md5变化并触发监听器
收到服务端返回发生变化的配置项后,客户端会通过/v1/cs/configs接口获取对应的配置,并将配置保存到本地文件系统作为snapshot,这部分在配置查询部分看过,都是调用ClientWorker#getServerConfig方法。最后会将配置更新到CacheData的content字段中。
上述步骤处理完成后,通过CacheData.checkListenerMd5校验配置是否发生变更,并触发监听器。
// CacheData.java// 注册在这个CacheData配置上的监听器private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;// 配置的md5private volatile String md5;void checkListenerMd5() {for (ManagerListenerWrap wrap : listeners) {// 比较CacheData中的md5与Listener中上次的md5是否相同if (!md5.equals(wrap.lastCallMd5)) {// 不相同则触发监听器safeNotifyListener(dataId, group, content, type, md5, wrap);}}}
safeNotifyListener方法是通知监听器的主逻辑,如果Listener配置了自己的Executor,将在自己配置的线程服务里执行监听逻辑,默认使用长轮询线程执行监听逻辑。
// CacheData.javaprivate void safeNotifyListener(final String dataId, final String group, final String content, final String type,final String md5, final ManagerListenerWrap listenerWrap) {final Listener listener = listenerWrap.listener;Runnable job = new Runnable() {@Overridepublic void run() {try {// 如果是AbstractSharedListener,把dataId和group放到它的成员变量里if (listener instanceof AbstractSharedListener) {AbstractSharedListener adapter = (AbstractSharedListener) listener;adapter.fillContext(dataId, group);}ConfigResponse cr = new ConfigResponse();cr.setDataId(dataId);cr.setGroup(group);cr.setContent(content);// 给用户的钩子,忽略configFilterChainManager.doFilter(null, cr);String contentTmp = cr.getContent();// 触发监听器的receiveConfigInfo方法listener.receiveConfigInfo(contentTmp);// 如果是AbstractConfigChangeListener实例,触发receiveConfigChange方法if (listener instanceof AbstractConfigChangeListener) {Map data = ConfigChangeHandler.getInstance().parseChangeData(listenerWrap.lastContent, content, type);ConfigChangeEvent event = new ConfigChangeEvent(data);((AbstractConfigChangeListener) listener).receiveConfigChange(event);listenerWrap.lastContent = content;}// 更新监听器的上次md5值listenerWrap.lastCallMd5 = md5;} catch (NacosException ex) {LOGGER.error();} catch (Throwable t) {LOGGER.error();} finally {Thread.currentThread().setContextClassLoader(myClassLoader);}}};try {// 如果监听器配置了executor,使用配置的executor执行上面的任务if (null != listener.getExecutor()) {listener.getExecutor().execute(job);} else {// 否则直接执行,也就是在长轮询线程中执行job.run();}} catch (Throwable t) {LOGGER.error();}}
这里根据Listener的类型会触发不同的监听方法。如果是普通的Listener,会触发receiveConfigInfo方法,得到一个String,是变更后的配置值。
public interface Listener {Executor getExecutor();void receiveConfigInfo(final String configInfo);}
如果是AbstractConfigChangeListener监听器,会触发receiveConfigChange方法,得到一个ConfigChangeEvent。
public abstract class AbstractConfigChangeListener extends AbstractListener {public abstract void receiveConfigChange(final ConfigChangeEvent event);// 注意这里receiveConfigInfo是个空实现@Overridepublic void receiveConfigInfo(final String configInfo) {}}
但是AbstractConfigChangeListener监听是有前提条件的,配置文件必须是yaml格式或properties格式,否则将不会触发Listener逻辑!见ConfigChangeHandler的parseChangeData方法,如果找不到解析器,会返回一个空的map。
public Map parseChangeData(String oldContent, String newContent, String type) throws IOException {for (ConfigChangeParser changeParser : this.parserList) {// 判断是否有可以解析这种配置文件类型,目前仅支持properties和yamlif (changeParser.isResponsibleFor(type)) {return changeParser.doParse(oldContent, newContent, type);}}return Collections.emptyMap();}
safeNotifyListener这部分逻辑中构造的ConfigChangeEvent将不会包含任何数据。
if (listener instanceof AbstractConfigChangeListener) {Map data = ConfigChangeHandler.getInstance().parseChangeData(listenerWrap.lastContent, content, type);// 如果map为空,这里构造的event里的数据就也为空了,监听器感知不到配置变更ConfigChangeEvent event = new ConfigChangeEvent(data);((AbstractConfigChangeListener) listener).receiveConfigChange(event);listenerWrap.lastContent = content;}
public class ConfigChangeEvent {private final Map<String, ConfigChangeItem> data;public ConfigChangeEvent(Map<String, ConfigChangeItem> data) {this.data = data;}public ConfigChangeItem getChangeItem(String key) {return data.get(key);}public Collection<ConfigChangeItem> getChangeItems() {return data.values();}}
