一、Nacos Client


流程简介

构建Instance实例,将其封装成Http请求,然后发送至Server进行注册。通过BeatReactor,定时向Server上报心跳信息。
nacos-服务注册-client.png

设计要点

1、BeatInfo
如果是临时实例,则不会在Nacos服务端持久化存储,需要通过上报心跳的方式进行保活,如果一段时间内没有上报心跳,则会被Nacos服务端摘除。在被摘除后如果又开始上报心跳,则会重新将这个实例注册。持久化实例则会持久化到Nacos服务端,此时即使注册实例的客户端进程不在,这个实例也不会从服务端删除,只会将健康状态设为不健康。

2、getServerList

  1. private List<String> getServerList() {
  2. List<String> snapshot = serversFromEndpoint;
  3. if (!CollectionUtils.isEmpty(serverList)) {
  4. snapshot = serverList;
  5. }
  6. return snapshot;
  7. }

这里其实涉及到Nacos集群的概念了,serversFromEndpoint其实是向Nacos Server获取当前Nacos集群中的server列表,但是Nacos中用户设置的Nacos Server Addr的优先级是大于Nacos Client端去远程获取到的server列表的。

3、Nacos Server的选取

  1. if (servers != null && !servers.isEmpty()) {
  2. Random random = new Random(System.currentTimeMillis());
  3. int index = random.nextInt(servers.size());
  4. for (int i = 0; i < servers.size(); i++) {
  5. String server = servers.get(index);
  6. try {
  7. return callServer(api, params, body, server, method);
  8. } catch (NacosException e) {
  9. exception = e;
  10. if (NAMING_LOGGER.isDebugEnabled()) {
  11. NAMING_LOGGER.debug("request {} failed.", server, e);
  12. }
  13. }
  14. index = (index + 1) % servers.size();
  15. }
  16. }

如果Nacos Serve是集群模式部署的话,那么会采用随机策略选择一个Nacos Server Addr作为进行Instance注册的Http请求地址;如果请求失败的话则再次重新选取一个Nacos Server。


二、Nacos Server


流程简介

完成实例存入Service,同时会触发两个事件。其中一个事件是用于数据同步的,Nacos服务端会根据这个服务是否是临时对象的信息,使用Distro或者Raft协议进行同步,通知其他的Nacos节点该服务发生了变更;另一个事件则通知在该Nacos服务节点上订阅了该服务的订阅者,并根据订阅者信息,通过UDP的方式,把最新的服务列表推送到订阅者客户端上,这就完成了一次服务注册流程。
image.png

设计要点

由分级存储模型可知,健康检查、同步机制集中在Cluster层。
image.png

1、CanDistro**
实例注册时,DistroFilter会进行请求拦截,以判断当前服务是否由当前节点负责处理。如果不是则转发到相应的节点。具体判断逻辑由 DistroMapper 负责。

  1. /**
  2. * Judge whether current server is responsible for input service.
  3. *
  4. * @param serviceName service name
  5. * @return true if input service is response, otherwise false
  6. */
  7. public boolean responsible(String serviceName) {
  8. final List<String> servers = healthyList;
  9. if (!switchDomain.isDistroEnabled() || ApplicationUtils.getStandaloneMode()) {
  10. return true;
  11. }
  12. if (CollectionUtils.isEmpty(servers)) {
  13. // means distro config is not ready yet
  14. return false;
  15. }
  16. int index = servers.indexOf(ApplicationUtils.getLocalAddress());
  17. int lastIndex = servers.lastIndexOf(ApplicationUtils.getLocalAddress());
  18. if (lastIndex < 0 || index < 0) {
  19. return true;
  20. }
  21. int target = distroHash(serviceName) % servers.size();
  22. // 类hash一致性算法
  23. return target >= index && target <= lastIndex;
  24. }


2、ClientBeatCheckTask
检查并更新客户端临时实例的状态,如果过期的话则从实例列表中移除,同时发布事件事件:

  • 服务变更 ServiceChangeEvent
  • 实例超时 InstanceHeartbeatTimeoutEvent

    1. @Override
    2. public void run() {
    3. try {
    4. if (!getDistroMapper().responsible(service.getName())) {
    5. return;
    6. }
    7. if (!getSwitchDomain().isHealthCheckEnabled()) {
    8. return;
    9. }
    10. List<Instance> instances = service.allIPs(true);
    11. // 标记:first set health status of instances:
    12. for (Instance instance : instances) {
    13. if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
    14. if (!instance.isMarked()) {
    15. if (instance.isHealthy()) {
    16. instance.setHealthy(false);
    17. Loggers.EVT_LOG
    18. .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
    19. instance.getIp(), instance.getPort(), instance.getClusterName(),
    20. service.getName(), UtilsAndCommons.LOCALHOST_SITE,
    21. instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
    22. // 发布事件
    23. getPushService().serviceChanged(service);
    24. ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
    25. }
    26. }
    27. }
    28. }
    29. if (!getGlobalConfig().isExpireInstance()) {
    30. return;
    31. }
    32. // 清除:then remove obsolete instances:
    33. for (Instance instance : instances) {
    34. if (instance.isMarked()) {
    35. continue;
    36. }
    37. if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
    38. // delete instance
    39. Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
    40. JacksonUtils.toJson(instance));
    41. deleteIp(instance);
    42. }
    43. }
    44. } catch (Exception e) {
    45. Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
    46. }
    47. }

    3、HealthCheckTask
    目前只有持久化的实例才可以选定健康检查方式,非持久化的实例只会使用心跳进行健康检查。

    4、Notifier
    消息通知器。

    5、PushService
    消息推送器,通过UDP的方式把最新的服务列表推送到订阅者客户端上。

    1. private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
    2. if (ackEntry == null) {
    3. Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
    4. return null;
    5. }
    6. if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
    7. Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
    8. ackMap.remove(ackEntry.key);
    9. udpSendTimeMap.remove(ackEntry.key);
    10. failedPush += 1;
    11. return ackEntry;
    12. }
    13. try {
    14. if (!ackMap.containsKey(ackEntry.key)) {
    15. totalPush++;
    16. }
    17. ackMap.put(ackEntry.key, ackEntry);
    18. udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());
    19. Loggers.PUSH.info("send udp packet: " + ackEntry.key);
    20. // 通过UDP向Subscriber推送服务实例信息
    21. udpSocket.send(ackEntry.origin);
    22. ackEntry.increaseRetryTime();
    23. GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
    24. TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);
    25. return ackEntry;
    26. } catch (Exception e) {
    27. Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,
    28. ackEntry.origin.getAddress().getHostAddress(), e);
    29. ackMap.remove(ackEntry.key);
    30. udpSendTimeMap.remove(ackEntry.key);
    31. failedPush += 1;
    32. return null;
    33. }
    34. }

    6、DistroDelayTaskExecuteEngine
    Nacos Server 节点数据同步器