Java SpringCloud Nacos

Nacos 服务注册需要具备的能力:

  • 服务提供者把自己的协议地址注册到Nacos server
  • 服务消费者需要从Nacos Server上去查询服务提供者的地址(根据服务名称)
  • Nacos Server需要感知到服务提供者的上下线的变化
  • 服务消费者需要动态感知到Nacos Server端服务地址的变化

作为注册中心所需要的能力大多如此,需要做的是理解各种注册中心的独有特性,总结他们的共性。

Nacos的实现原理:

下面先来了解一下 Nacos 注册中心的实现原理,通过下面这张图来说明。
Nacos服务注册的原理 - 图1
Nacos 注册中心的实现原理
图中的流程是大家所熟悉的,不同的是在Nacos 中,服务注册时在服务端本地会通过轮询注册中心集群节点地址进行服务得注册,在注册中心上,即Nacos Server上采用了Map保存实例信息,当然配置了持久化的服务会被保存到数据库中,在服务的调用方,为了保证本地服务实例列表的动态感知,Nacos与其他注册中心不同的是,采用了 Pull/Push同时运作的方式。通过这些对Nacos注册中心的原理有了一定的了解。从源码层面去验证这些理论知识。

Nacos的源码分析(结合spring-cloud-alibaba +dubbo +nacos 的整合):

「服务注册的流程:」
在基于Dubbo服务发布的过程中, 自动装配是走的事件监听机制,在 DubboServiceRegistrationNonWebApplicationAutoConfiguration 这个类中,这个类会监听 ApplicationStartedEvent 事件,这个事件是spring boot在2.0新增的,就是当spring boot应用启动完成之后会发布这个时间。而此时监听到这个事件之后,会触发注册的动作。

  1. @EventListener(ApplicationStartedEvent.class)
  2. public void onApplicationStarted() {
  3. setServerPort();
  4. register();
  5. }
  6. private void register() {
  7. if (registered) {
  8. return;
  9. }
  10. serviceRegistry.register(registration);
  11. registered = true;
  12. }

serviceRegistry是 spring-cloud 提供的接口实现(org.springframework.cloud.client.serviceregistry.ServiceRegistry),很显然注入的实例是:NacosServiceRegistry
Nacos服务注册的原理 - 图2
NacosServiceRegistry
然后进入到实现类的注册方法:

  1. @Override
  2. public void register(Registration registration) {
  3. if (StringUtils.isEmpty(registration.getServiceId())) {
  4. log.warn("No service to register for nacos client...");
  5. return;
  6. }
  7. //对应当前应用的application.name
  8. String serviceId = registration.getServiceId();
  9. //表示nacos上的分组配置
  10. String group = nacosDiscoveryProperties.getGroup();
  11. //表示服务实例信息
  12. Instance instance = getNacosInstanceFromRegistration(registration);
  13. try {
  14. //通过命名服务进行注册
  15. namingService.registerInstance(serviceId, group, instance);
  16. log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
  17. instance.getIp(), instance.getPort());
  18. }
  19. catch (Exception e) {
  20. log.error("nacos registry, {} register failed...{},", serviceId,
  21. registration.toString(), e);
  22. // rethrow a RuntimeException if the registration is failed.
  23. // issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
  24. rethrowRuntimeException(e);
  25. }
  26. }

接下去就是开始注册实例,主要做两个动作

  1. 如果当前注册的是临时节点,则构建心跳信息,通过beat反应堆来构建心跳任务
  2. 调用registerService发起服务注册

    1. @Override
    2. public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    3. ////是否是临时节点,如果是临时节点,则构建心跳信息
    4. if (instance.isEphemeral()) {
    5. BeatInfo beatInfo = new BeatInfo();
    6. beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
    7. beatInfo.setIp(instance.getIp());
    8. beatInfo.setPort(instance.getPort());
    9. beatInfo.setCluster(instance.getClusterName());
    10. beatInfo.setWeight(instance.getWeight());
    11. beatInfo.setMetadata(instance.getMetadata());
    12. beatInfo.setScheduled(false);
    13. //beatReactor, 添加心跳信息进行处理
    14. beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
    15. }
    16. //调用服务代理类进行注册
    17. serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
    18. }

    然后调用 NamingProxy 的注册方法进行注册,代码逻辑很简单,构建请求参数,发起请求。 ```java public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

    NAMING_LOGGER.info(“[REGISTER-SERVICE] {} registering service {} with instance: {}”,

    1. namespaceId, serviceName, instance);

    final Map params = new HashMap(8); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put(“ip”, instance.getIp()); params.put(“port”, String.valueOf(instance.getPort())); params.put(“weight”, String.valueOf(instance.getWeight())); params.put(“enable”, String.valueOf(instance.isEnabled())); params.put(“healthy”, String.valueOf(instance.isHealthy())); params.put(“ephemeral”, String.valueOf(instance.isEphemeral())); params.put(“metadata”, JSON.toJSONString(instance.getMetadata()));

    reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);

}

  1. 往下走就会发现上面提到的,服务在进行注册的时候会轮询配置好的注册中心的地址:
  2. ```java
  3. public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
  4. params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
  5. if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
  6. throw new IllegalArgumentException("no server available");
  7. }
  8. Exception exception = new Exception();
  9. //如果服务地址不为空
  10. if (servers != null && !servers.isEmpty()) {
  11. //随机获取一台服务器节点
  12. Random random = new Random(System.currentTimeMillis());
  13. int index = random.nextInt(servers.size());
  14. // 遍历服务列表
  15. for (int i = 0; i < servers.size(); i++) {
  16. String server = servers.get(index);//获得索引位置的服务节点
  17. try {//调用指定服务
  18. return callServer(api, params, server, method);
  19. } catch (NacosException e) {
  20. exception = e;
  21. NAMING_LOGGER.error("request {} failed.", server, e);
  22. } catch (Exception e) {
  23. exception = e;
  24. NAMING_LOGGER.error("request {} failed.", server, e);
  25. }
  26. //轮询
  27. index = (index + 1) % servers.size();
  28. }
  29. // ..........
  30. }

最后通过 callServer(api, params, server, method) 发起调用,这里通过 JSK自带的 HttpURLConnection 进行发起调用。可以通过断点的方式来看到这里的请求参数:
Nacos服务注册的原理 - 图3
HttpURLConnection 进行发起调用
期间可能会有多个 GET 的请求获取服务列表,是正常的,会发现有如上的一个请求,会调用 http://192.168.200.1:8848/nacos/v1/ns/instance 这个地址。那么接下去就是Nacos Server 接受到服务端的注册请求的处理流程。需要下载Nacos Server 源码,源码下载可以参考官方文档,本文不做过多阐述。
「Nacos服务端的处理:」
服务端提供了一个InstanceController类,在这个类中提供了服务注册相关的API,而服务端发起注册时,调用的接口是:[post]: /nacos/v1/ns/instance ,serviceName: 代表客户端的项目名称 ,namespace: nacos 的namespace。

  1. @CanDistro
  2. @PostMapping
  3. @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
  4. public String register(HttpServletRequest request) throws Exception {
  5. final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
  6. final String namespaceId = WebUtils
  7. .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
  8. // 从请求中解析出instance实例
  9. final Instance instance = parseInstance(request);
  10. serviceManager.registerInstance(namespaceId, serviceName, instance);
  11. return "ok";
  12. }

然后调用 ServiceManager 进行服务的注册

  1. public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
  2. //创建一个空服务,在Nacos控制台服务列表展示的服务信息,实际上是初始化一个serviceMap,它是一个ConcurrentHashMap集合
  3. createEmptyService(namespaceId, serviceName, instance.isEphemeral());
  4. //从serviceMap中,根据namespaceId和serviceName得到一个服务对象
  5. Service service = getService(namespaceId, serviceName);
  6. if (service == null) {
  7. throw new NacosException(NacosException.INVALID_PARAM,
  8. "service not found, namespace: " + namespaceId + ", service: " + serviceName);
  9. }
  10. //调用addInstance创建一个服务实例
  11. addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
  12. }

在创建空的服务实例的时候发现了存储实例的map:

  1. public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
  2. throws NacosException {
  3. //从serviceMap中获取服务对象
  4. Service service = getService(namespaceId, serviceName);
  5. if (service == null) {//如果为空。则初始化
  6. Loggers.SRV\_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
  7. service = new Service();
  8. service.setName(serviceName);
  9. service.setNamespaceId(namespaceId);
  10. service.setGroupName(NamingUtils.getGroupName(serviceName));
  11. // now validate the service. if failed, exception will be thrown
  12. service.setLastModifiedMillis(System.currentTimeMillis());
  13. service.recalculateChecksum();
  14. if (cluster != null) {
  15. cluster.setService(service);
  16. service.getClusterMap().put(cluster.getName(), cluster);
  17. }
  18. service.validate();
  19. putServiceAndInit(service);
  20. if (!local) {
  21. addOrReplaceService(service);
  22. }
  23. }

getService 方法中发现了Map:

  1. /*
  2. * Map(namespace, Map(group::serviceName, Service)).
  3. */
  4. private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

通过注释可以知道,Nacos是通过不同的 namespace 来维护服务的,而每个namespace下有不同的group,不同的group下才有对应的Service ,再通过这个 serviceName 来确定服务实例。
第一次进来则会进入初始化,初始化完会调用 putServiceAndInit

  1. private void putServiceAndInit(Service service) throws NacosException {
  2. putService(service);//把服务信息保存到serviceMap集合
  3. service.init();//建立心跳检测机制
  4. //实现数据一致性监听,ephemeral(标识服务是否为临时服务,默认是持久化的,也就是true)=true表示采用raft协议,false表示采用Distro
  5. consistencyService
  6. .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
  7. consistencyService
  8. .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
  9. Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
  10. }

获取到服务以后把服务实例添加到集合中,然后基于一致性协议进行数据的同步。然后调用 addInstance

  1. public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
  2. throws NacosException {
  3. // 组装key
  4. String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
  5. // 获取刚刚组装的服务
  6. Service service = getService(namespaceId, serviceName);
  7. synchronized (service) {
  8. List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
  9. Instances instances = new Instances();
  10. instances.setInstanceList(instanceList);
  11. // 也就是上一步实现监听的类里添加注册服务
  12. consistencyService.put(key, instances);
  13. }
  14. }

然后给服务注册方发送注册成功的响应,结束服务注册流程。