Repository: Author: GitHub Gitee
Introduction:
Author:杭州电子科技大学 管理学院 2016级 工商管理 唐涛
CreateTime:2020-5-25 14:12:00
UpdateTime:2020-5-25 14:52:09
Copyright: 唐涛 HOME | 首页 © 2020
Email: tangtao2099@outlook.com 16011324@hdu.edu.cn
创建 ConfigService
NacosFactory这个工厂负责创建ConfigServiceNamingServiceNamingMaintainService
public class NacosFactory {/*** Create config service** @param properties init param* @return config* @throws NacosException Exception*/public static ConfigService createConfigService(Properties properties) throws NacosException {return ConfigFactory.createConfigService(properties);}/*** Create config service** @param serverAddr server list* @return config* @throws NacosException Exception*/public static ConfigService createConfigService(String serverAddr) throws NacosException {return ConfigFactory.createConfigService(serverAddr);}
创建 ConfigService 时通过调用 ConfigFactory.createConfigService(properties) ,
而 ConfigFactory 是通过反射来创建 ConfigService。
其中 NacosConfigService implements ConfigService
源码如下:
/**
* Config Factory
* nacos
*
* @author Nacos
*/
public class ConfigFactory {
public static String nacosConfigServicePath = "com.alibaba.nacos.client.config.NacosConfigService";
/**
* @param properties : 传入的属性配置信息
* @return com.alibaba.nacos.api.config.ConfigService
* @throws NacosException Exception
* @author TangTao tangtao2099@outlook.com
* @website https://www.promiselee.cn/tao
* @createTime 2020/5/25 14:12
* @updateTime 2020/5/25 14:12
* @methodName createConfigService
* @note 通过反射创建 {@link com.alibaba.nacos.client.config.NacosConfigService}
*/
public static ConfigService createConfigService(Properties properties) throws NacosException {
try {
// 通过反射创建 NacosConfigService
Class<?> driverImplClass = Class.forName(nacosConfigServicePath);
Constructor constructor = driverImplClass.getConstructor(Properties.class);
ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
return vendorImpl;
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
/**
* Create Config
* 自定义 serverAddr
*
* @param serverAddr serverList
* @return Config
* @throws ConfigService Exception
*/
public static ConfigService createConfigService(String serverAddr) throws NacosException {
Properties properties = new Properties();
properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
return createConfigService(properties);
}
}
NacosConfigService 的构造函数
public NacosConfigService(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties);
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if (StringUtils.isBlank(encodeTmp)) {
encode = Constants.ENCODE;
} else {
encode = encodeTmp.trim();
}
initNamespace(properties);
agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
agent.start();
worker = new ClientWorker(agent, configFilterChainManager, properties);
}
这里创建了 new ClientWorker(agent, configFilterChainManager, properties) ;
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
// Initialize the timeout parameter
init(properties);
// 此线程工作量较小 核心数为 1
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});
// 此线程 核心数为cpu核数 最大integer max 此线程任务量大 负责长轮询
executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
// !刚创建线程池的时候 先丢一个守护线程进去
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
t.setDaemon(true);
return t;
}
});
// 每隔10ms 检测一次配置信息
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
通过阅读源码,可知主要创建了两个
newScheduledThreadPool线程池,一个负责Client Worker,实际上就是通过定时10ms来负责检查配置信息。 另一个线程池时负责长轮询的,负责Client Service
public void checkConfigInfo() {
// 分任务
int listenerSize = cacheMap.get().size();
// 向上取整为批数
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
// 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
/**
* groupKey -> cacheData
*/
private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(
new HashMap<String, CacheData>());
