Repository: Author: GitHub Gitee

Introduction:

Author:杭州电子科技大学 管理学院 2016级 工商管理 唐涛

CreateTime:2020-5-25 14:12:00

UpdateTime:2020-5-25 14:52:09

Copyright: 唐涛 HOME | 首页 © 2020 Nacos 配置中心实现 - 图1 Nacos 配置中心实现 - 图2

Email: tangtao2099@outlook.com 16011324@hdu.edu.cn

Link: 知乎 GitHub 语雀

创建 ConfigService

NacosFactory 这个工厂负责创建 ConfigService NamingService NamingMaintainService

  1. public class NacosFactory {
  2. /**
  3. * Create config service
  4. *
  5. * @param properties init param
  6. * @return config
  7. * @throws NacosException Exception
  8. */
  9. public static ConfigService createConfigService(Properties properties) throws NacosException {
  10. return ConfigFactory.createConfigService(properties);
  11. }
  12. /**
  13. * Create config service
  14. *
  15. * @param serverAddr server list
  16. * @return config
  17. * @throws NacosException Exception
  18. */
  19. public static ConfigService createConfigService(String serverAddr) throws NacosException {
  20. return ConfigFactory.createConfigService(serverAddr);
  21. }

创建 ConfigService 时通过调用 ConfigFactory.createConfigService(properties)

ConfigFactory 是通过反射来创建 ConfigService

其中 NacosConfigService implements ConfigService

源码如下:

/**
 * Config Factory
 * nacos
 *
 * @author Nacos
 */
public class ConfigFactory {

    public static String nacosConfigServicePath = "com.alibaba.nacos.client.config.NacosConfigService";

    /**
     * @param properties : 传入的属性配置信息
     * @return com.alibaba.nacos.api.config.ConfigService
     * @throws NacosException Exception
     * @author TangTao tangtao2099@outlook.com
     * @website https://www.promiselee.cn/tao
     * @createTime 2020/5/25 14:12
     * @updateTime 2020/5/25 14:12
     * @methodName createConfigService
     * @note 通过反射创建 {@link com.alibaba.nacos.client.config.NacosConfigService}
     */
    public static ConfigService createConfigService(Properties properties) throws NacosException {

        try {
            // 通过反射创建 NacosConfigService
            Class<?> driverImplClass = Class.forName(nacosConfigServicePath);
            Constructor constructor = driverImplClass.getConstructor(Properties.class);
            ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
            return vendorImpl;
        } catch (Throwable e) {
            throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
        }
    }

    /**
     * Create Config
     * 自定义 serverAddr
     *
     * @param serverAddr serverList
     * @return Config
     * @throws ConfigService Exception
     */
    public static ConfigService createConfigService(String serverAddr) throws NacosException {
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
        return createConfigService(properties);
    }

}

NacosConfigService 的构造函数

 public NacosConfigService(Properties properties) throws NacosException {
        ValidatorUtils.checkInitParam(properties);
        String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
        if (StringUtils.isBlank(encodeTmp)) {
            encode = Constants.ENCODE;
        } else {
            encode = encodeTmp.trim();
        }
        initNamespace(properties);
        agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
        agent.start();
        worker = new ClientWorker(agent, configFilterChainManager, properties);
    }

这里创建了 new ClientWorker(agent, configFilterChainManager, properties)

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
    this.agent = agent;
    this.configFilterChainManager = configFilterChainManager;

    // Initialize the timeout parameter

    init(properties);

    // 此线程工作量较小 核心数为 1
    executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });

    // 此线程 核心数为cpu核数 最大integer max 此线程任务量大 负责长轮询
    executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
        // !刚创建线程池的时候 先丢一个守护线程进去 
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });

    // 每隔10ms 检测一次配置信息
    executor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                checkConfigInfo();
            } catch (Throwable e) {
                LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
            }
        }
    }, 1L, 10L, TimeUnit.MILLISECONDS);
}

通过阅读源码,可知主要创建了两个 newScheduledThreadPool 线程池,一个负责 Client Worker ,实际上就是通过定时 10ms 来负责检查配置信息。 另一个线程池时负责长轮询的,负责 Client Service

public void checkConfigInfo() {
    // 分任务
    int listenerSize = cacheMap.get().size();
    // 向上取整为批数
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            // 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
            executorService.execute(new LongPollingRunnable(i));
        }
        currentLongingTaskCount = longingTaskCount;
    }
}

/**
 * groupKey -> cacheData
 */
private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(
    new HashMap<String, CacheData>());