在这里,得取分两个概念,一个instance即一个实例,也就是一个Client,而ClientManager是所有实例的管理器

先看看ClientManager

ClientManager是一个接口

首先先看看这个接口定义了什么规范

  1. public interface ClientManager {
  2. boolean clientConnected(String clientId, ClientAttributes attributes);
  3. boolean clientConnected(Client client);
  4. boolean syncClientConnected(String clientId, ClientAttributes attributes);
  5. boolean clientDisconnected(String clientId);
  6. Client getClient(String clientId);
  7. boolean contains(final String clientId);
  8. Collection<String> allClientId();
  9. boolean isResponsibleClient(Client client);
  10. boolean verifyClient(String clientId);
  11. }
  • 客户端管理工具嘛,所以自然是客户端的CRUD
  • 接下来看看都有什么接口实现了它

image.png
本篇,就以EphemeralIpPortClientManager为例子来看,因为这个是短暂临时的客户端,也就是我们的服务上线掉线啥的就是它管理的

EphemeralIpPortClientManager

谈谈clients这个客户端的家

  1. @Component("ephemeralIpPortClientManager")
  2. public class EphemeralIpPortClientManager implements ClientManager {
  3. private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();
  4. // ignore...
  5. }
  • 首先他是一个Component,那么我就理解他是一个单例的spring bean
  • 其次映入眼帘的就是一个ConcurrentMap,说明这肯定是一个重要的东西,于是debug

这个例子里,我起了两个服务,分别为 9998端口的server1 9999端口的server2 ,如果看了上一篇的心跳机制,可能会联想到每次客户端上报心跳都会去ClientManager的这个clients里面获取IpPortBasedClient,所以我就在这个地方打断点,看看clients里面有什么??
image.png

  • 可以看到,这里有四个客户端!!而不是两个,奇怪的点是,明明我服务起了两个服务,但是为什么却有四个客户端呢???有一个9999和9998我能“李姐”,但是另外两个是啥?

于是翻看调用栈信息,可以看到:
image.png
InstanceController调用来的,于是接着看看是什么接口来这里注册的?
image.png

  • 这里是调用了**/list**接口,意思就是查询指定服务的所有实例,但是主要的地方还是return那个地方执行了listInstance那里,这里贴一下里面的源码

    1. @Override
    2. public ServiceInfo listInstance(String namespaceId, String serviceName, Subscriber subscriber, String cluster,
    3. boolean healthOnly) {
    4. // For adapt 1.X subscribe logic
    5. if (subscriber.getPort() > 0 && pushService.canEnablePush(subscriber.getAgent())) {
    6. String clientId = IpPortBasedClient.getClientId(subscriber.getAddrStr(), true);
    7. createIpPortClientIfAbsent(clientId);
    8. clientOperationService.subscribeService(service, subscriber, clientId);
    9. }
    10. // ignore..
    11. }
    12. com.alibaba.nacos.naming.core#InstanceOperatorClientImpl
    13. private void createIpPortClientIfAbsent(String clientId) {
    14. if (!clientManager.contains(clientId)) {
    15. clientManager.clientConnected(clientId, new ClientAttributes());
    16. }
    17. }
  • 这里的第7行,**createIpPortClientIfAbsent**会在clientManager里面创建新的client对象(如果clients那个ConcurrentHashMap里面不存在的话)

  • **clientInfo.version.compareTo**_**(**_**VersionUtil.**_**parseVersion****(**_**switchDomain.getPushJavaVersion**_**())) **_ 这行代码是第5行 canEnablePush做的判断,逻辑是获取版本号的之类的,对应上面注释的逻辑,要用于适配1.x的逻辑

所以这里破案了,为何明明有两个服务,但是 EphemeralIpPortClientManager 却有四个客户端,其中两个就是对应的我们的客户端实例,也就是在registerInstance的时候创建的客户端,当然这两个客户端也会在上报心跳的的时候来到clients这个map里来校验,因为心跳信息里面也保存了客户端的信息。另外两个,我的理解是客户端拉取当前服务的实例的进程,即每个实例会有另外一个进程来拉取该服务集群的其他的客户端的信息(不过这里的注释写的是 For adapt 1.X subscribe logic ,本人客户端使用1.x, 源码调试使用的2.x进行的调试 在1.x的客户端代码中是有一个 HostReactor 来拉取的list的 , 不过2.x的代码中并不存在这个类,结合官方的注释,可能这个 HostReactor 已经在2.x的版本中进行了移除,不过兼容1.x的版本,所以依然是向clients里面添加了)

所以个人理解,clients这个ConcurrentHashMap里面保存的是客户端实例,对与1.x而言,这个clients保存的客户端概念不止是实例,还有实例内部的一些其他的线程(就像根据服务名拉取实例信息的/list接口一样),但是到了2.x(本人拉的2.0.3)而言,客户端的代码里面已经移除了 HostReactor 这个类了,可能对于2.x而言,客户端只是一个单单的客户端,不再算上其他的线程,当然这里做了兼容处理,所以依然是想client里面添加了。

当然上面扯了这么多,只有一点比较重要,上面那堆瞎扯的纯属个人猜测,就是ClientManager是保存客户端的管理者,这里就站在2.x的角度理解,其内部的clients保存的就是客户端的实例,该实例是对于IP+端口的维度而言, 被保存在 **private final ConcurrentMap**_**<**_**String, IpPortBasedClient**_**> **_**clients = new ConcurrentHashMap**_**<>()**_**;**
这里贴一下clients的entry:
image.png

过期的客户端清洁器 ExpiredClientCleaner

构造器初始化的时候提交的一个周期性为5s的客户端清理线程

  1. @Component("ephemeralIpPortClientManager")
  2. public class EphemeralIpPortClientManager implements ClientManager {
  3. // ignore...
  4. public EphemeralIpPortClientManager(DistroMapper distroMapper, SwitchDomain switchDomain) {
  5. this.distroMapper = distroMapper;
  6. GlobalExecutor.scheduleExpiredClientCleaner(new ExpiredClientCleaner(this, switchDomain), 0,
  7. Constants.DEFAULT_HEART_BEAT_INTERVAL, TimeUnit.MILLISECONDS);
  8. clientFactory = ClientFactoryHolder.getInstance().findClientFactory(ClientConstants.EPHEMERAL_IP_PORT);
  9. }
  10. private static class ExpiredClientCleaner implements Runnable {
  11. private final EphemeralIpPortClientManager clientManager;
  12. private final SwitchDomain switchDomain;
  13. public ExpiredClientCleaner(EphemeralIpPortClientManager clientManager, SwitchDomain switchDomain) {
  14. this.clientManager = clientManager;
  15. this.switchDomain = switchDomain;
  16. }
  17. @Override
  18. public void run() {
  19. long currentTime = System.currentTimeMillis();
  20. for (String each : clientManager.allClientId()) {
  21. IpPortBasedClient client = (IpPortBasedClient) clientManager.getClient(each);
  22. if (null != client && isExpireClient(currentTime, client)) {
  23. clientManager.clientDisconnected(each);
  24. }
  25. }
  26. }
  27. // ignore..
  28. }
  29. }

这里贴了EphemeralIpPortClientManager的构造器,从构造器可以得知:

  • 第6行 传入了clientManager这个对象,同时向 ScheduledExecutorService 线程池提交了一个任务(这是一个周期性的任务,即固定了一段时间后周而复始的执行,时间是5s一次)

    1. public static void scheduleExpiredClientCleaner(Runnable runnable, long initialDelay, long delay, TimeUnit unit) {
    2. EXPIRED_CLIENT_CLEANER_EXECUTOR.scheduleWithFixedDelay(runnable, initialDelay, delay, unit);
    3. }

    那么这个任务做了什么呢?

  • 看上面的run方法,遍历拿到所有的客户端的信息,校验这个客户端是否过期 **noUpdatedTime = currentTime - client.getLastUpdatedTime**_**()**_ 如果过期,则通过传入的clientManager移除

所以,就能够“李姐”,构造器初始化的时候提交的一个周期性为5s的客户端清理线程这句话了

DistroMapper

  1. @Component("distroMapper")
  2. public class DistroMapper extends MemberChangeListener {
  3. private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();
  4. private final DistroMapper distroMapper;
  5. private final ClientFactory<IpPortBasedClient> clientFactory;
  6. public EphemeralIpPortClientManager(DistroMapper distroMapper, SwitchDomain switchDomain) {
  7. this.distroMapper = distroMapper;
  8. GlobalExecutor.scheduleExpiredClientCleaner(new ExpiredClientCleaner(this, switchDomain), 0,
  9. Constants.DEFAULT_HEART_BEAT_INTERVAL, TimeUnit.MILLISECONDS);
  10. clientFactory = ClientFactoryHolder.getInstance().findClientFactory(ClientConstants.EPHEMERAL_IP_PORT);
  11. }
  12. }

这个是通过构造器注入的一个bean,里面保存了nacos服务的一些信息,和nacos集群也有点关系,之后有时间会往里面扒,这里就不细化了

Client

Client接口

  1. public interface Client {
  2. /**
  3. * Get the unique id of current client.
  4. *
  5. * @return id of client
  6. */
  7. String getClientId();
  8. /**
  9. * Whether is ephemeral of current client.
  10. *
  11. * @return true if client is ephemeral, otherwise false
  12. */
  13. boolean isEphemeral();
  14. /**
  15. * Set the last time for updating current client as current time.
  16. */
  17. void setLastUpdatedTime();
  18. /**
  19. * Get the last time for updating current client.
  20. *
  21. * @return last time for updating
  22. */
  23. long getLastUpdatedTime();
  24. /**
  25. * Add a new instance for service for current client.
  26. *
  27. * @param service publish service
  28. * @param instancePublishInfo instance
  29. * @return true if add successfully, otherwise false
  30. */
  31. boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo);
  32. /**
  33. * Remove service instance from client.
  34. *
  35. * @param service service of instance
  36. * @return instance info if exist, otherwise {@code null}
  37. */
  38. InstancePublishInfo removeServiceInstance(Service service);
  39. /**
  40. * Get instance info of service from client.
  41. *
  42. * @param service service of instance
  43. * @return instance info
  44. */
  45. InstancePublishInfo getInstancePublishInfo(Service service);
  46. /**
  47. * Get all published service of current client.
  48. *
  49. * @return published services
  50. */
  51. Collection<Service> getAllPublishedService();
  52. /**
  53. * Add a new subscriber for target service.
  54. *
  55. * @param service subscribe service
  56. * @param subscriber subscriber
  57. * @return true if add successfully, otherwise false
  58. */
  59. boolean addServiceSubscriber(Service service, Subscriber subscriber);
  60. /**
  61. * Remove subscriber for service.
  62. *
  63. * @param service service of subscriber
  64. * @return true if remove successfully, otherwise false
  65. */
  66. boolean removeServiceSubscriber(Service service);
  67. /**
  68. * Get subscriber of service from client.
  69. *
  70. * @param service service of subscriber
  71. * @return subscriber
  72. */
  73. Subscriber getSubscriber(Service service);
  74. /**
  75. * Get all subscribe service of current client.
  76. *
  77. * @return subscribe services
  78. */
  79. Collection<Service> getAllSubscribeService();
  80. /**
  81. * Generate sync data.
  82. *
  83. * @return sync data
  84. */
  85. ClientSyncData generateSyncData();
  86. /**
  87. * Whether current client is expired.
  88. *
  89. * @param currentTime unified current timestamp
  90. * @return true if client has expired, otherwise false
  91. */
  92. boolean isExpire(long currentTime);
  93. /**
  94. * Release current client and release resources if neccessary.
  95. */
  96. void release();
  97. }

可以先看看这里有抽象了哪些接口出来

IpPortBasedClient

看过之前的文章可能会对这个类有印象,服务注册的时候,或者心跳上报的时候,都会从ClientManager里面取出它来,我认为它就是客户端的载体,可以看看下图
image.png

IpPortBasedClient是继承自AbstractClient,实现了Client接口

  1. public abstract class AbstractClient implements Client {
  2. protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
  3. protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);
  4. protected volatile long lastUpdatedTime;
  5. // ignore..
  6. }
  7. public class IpPortBasedClient extends AbstractClient {
  8. public class IpPortBasedClient extends AbstractClient {
  9. public static final String ID_DELIMITER = "#";
  10. private final String clientId;
  11. private final boolean ephemeral;
  12. private final String responsibleId;
  13. private ClientBeatCheckTaskV2 beatCheckTask;
  14. private HealthCheckTaskV2 healthCheckTaskV2;
  15. public IpPortBasedClient(String clientId, boolean ephemeral) {
  16. this.ephemeral = ephemeral;
  17. this.clientId = clientId;
  18. this.responsibleId = getResponsibleTagFromId();
  19. }
  20. private String getResponsibleTagFromId() {
  21. int index = clientId.indexOf(IpPortBasedClient.ID_DELIMITER);
  22. return clientId.substring(0, index);
  23. }
  24. public static String getClientId(String address, boolean ephemeral) {
  25. return address + ID_DELIMITER + ephemeral;
  26. }
  27. @Override
  28. public String getClientId() {
  29. return clientId;
  30. }
  31. @Override
  32. public boolean isEphemeral() {
  33. return ephemeral;
  34. }
  35. public String getResponsibleId() {
  36. return responsibleId;
  37. }
  38. @Override
  39. public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
  40. return super.addServiceInstance(service, parseToHealthCheckInstance(instancePublishInfo));
  41. }
  42. @Override
  43. public boolean isExpire(long currentTime) {
  44. return isEphemeral() && getAllPublishedService().isEmpty() && currentTime - getLastUpdatedTime() > ClientConfig
  45. .getInstance().getClientExpiredTime();
  46. }
  47. public Collection<InstancePublishInfo> getAllInstancePublishInfo() {
  48. return publishers.values();
  49. }
  50. @Override
  51. public void release() {
  52. super.release();
  53. if (ephemeral) {
  54. HealthCheckReactor.cancelCheck(beatCheckTask);
  55. } else {
  56. healthCheckTaskV2.setCancelled(true);
  57. }
  58. }
  59. private HealthCheckInstancePublishInfo parseToHealthCheckInstance(InstancePublishInfo instancePublishInfo) {
  60. HealthCheckInstancePublishInfo result;
  61. if (instancePublishInfo instanceof HealthCheckInstancePublishInfo) {
  62. result = (HealthCheckInstancePublishInfo) instancePublishInfo;
  63. } else {
  64. result = new HealthCheckInstancePublishInfo();
  65. result.setIp(instancePublishInfo.getIp());
  66. result.setPort(instancePublishInfo.getPort());
  67. result.setHealthy(instancePublishInfo.isHealthy());
  68. result.setCluster(instancePublishInfo.getCluster());
  69. result.setExtendDatum(instancePublishInfo.getExtendDatum());
  70. }
  71. if (!ephemeral) {
  72. result.initHealthCheck();
  73. }
  74. return result;
  75. }
  76. /**
  77. * Init client.
  78. */
  79. public void init() {
  80. if (ephemeral) {
  81. beatCheckTask = new ClientBeatCheckTaskV2(this);
  82. HealthCheckReactor.scheduleCheck(beatCheckTask);
  83. } else {
  84. healthCheckTaskV2 = new HealthCheckTaskV2(this);
  85. HealthCheckReactor.scheduleCheck(healthCheckTaskV2);
  86. }
  87. }
  88. /**
  89. * Purely put instance into service without publish events.
  90. */
  91. public void putServiceInstance(Service service, InstancePublishInfo instance) {
  92. if (null == publishers.put(service, parseToHealthCheckInstance(instance))) {
  93. MetricsMonitor.incrementInstanceCount();
  94. }
  95. }
  96. }
  97. }
  • IpPortBasedClient是客户端的实体,同时保存了响应的一些信息
  • 大部分的方法都是获取客户端的信息,如clientId,是否是短暂在线等
  • 对于AbstractClient这个类来说,我是有点疑惑的。publishs在debug的时候可以发现,这里的key一直是客户端的服务名,而value是该客户端实例的一些信息,那么问题就来了,对于客户端维度而言,每一个客户端内部都保存一个这样的publishs的map,而每一个客户端实例对于服务而言是1对1的关系,那这个publishs不应该恒为1吗? 就像server1这个客户端,内部保存的publishmap不应该就是只有一个entry,并且为 **server1---->server1Info**,所以这个点还是有点不“李姐”的
  • 对于Subscriber,暂时还没理清是什么…..

接着我们看看**IpPortBasedClient**是何时被new出来的:

  1. public class EphemeralIpPortClientManager implements ClientManager {
  2. @Override
  3. public boolean clientConnected(final Client client) {
  4. clients.computeIfAbsent(client.getClientId(), s -> {
  5. Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());
  6. IpPortBasedClient ipPortBasedClient = (IpPortBasedClient) client;
  7. ipPortBasedClient.init();
  8. return ipPortBasedClient;
  9. });
  10. return true;
  11. }
  12. @Override
  13. public boolean clientConnected(String clientId, ClientAttributes attributes) {
  14. return clientConnected(clientFactory.newClient(clientId, attributes));
  15. }
  16. }
  • 从这块代码中可以知道,IpPortBasedClient是在客户端连接的时候被new出来,同时保存在ClientManager里面了,也就是我们的客户端管理器
  • 同时,调用了其init()方法

    1. /**
    2. * Init client.
    3. */
    4. public void init() {
    5. if (ephemeral) {
    6. beatCheckTask = new ClientBeatCheckTaskV2(this);
    7. HealthCheckReactor.scheduleCheck(beatCheckTask);
    8. } else {
    9. healthCheckTaskV2 = new HealthCheckTaskV2(this);
    10. HealthCheckReactor.scheduleCheck(healthCheckTaskV2);
    11. }
    12. }
  • 向线程池提交了一个**ClientBeatCheckTaskV2**任务,**GlobalExecutor.**_**scheduleNamingHealth****(**_**wrapperTask, 5000, 5000, TimeUnit.**_**MILLISECONDS)**_是一个延迟5s,周期为5s的周期性的线程任务

接下来看看这个ClientBeatCheckTaskV2是个什么任务

  1. public class ClientBeatCheckTaskV2 extends AbstractExecuteTask implements BeatCheckTask, NacosHealthCheckTask {
  2. // ignore...
  3. @Override
  4. public void doHealthCheck() {
  5. try {
  6. Collection<Service> services = client.getAllPublishedService();
  7. for (Service each : services) {
  8. HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) client
  9. .getInstancePublishInfo(each);
  10. interceptorChain.doInterceptor(new InstanceBeatCheckTask(client, each, instance));
  11. }
  12. } catch (Exception e) {
  13. Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
  14. }
  15. }
  16. }
  17. public class InstanceBeatCheckTask implements Interceptable {
  18. // ignore...
  19. @Override
  20. public void passIntercept() {
  21. for (InstanceBeatChecker each : CHECKERS) {
  22. each.doCheck(client, service, instancePublishInfo);
  23. }
  24. }
  25. }
  26. public class UnhealthyInstanceChecker implements InstanceBeatChecker {
  27. // ignore..
  28. @Override
  29. public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {
  30. if (instance.isHealthy() && isUnhealthy(service, instance)) {
  31. changeHealthyStatus(client, service, instance);
  32. }
  33. }
  34. private void changeHealthyStatus(Client client, Service service, HealthCheckInstancePublishInfo instance) {
  35. instance.setHealthy(false);
  36. Loggers.EVT_LOG
  37. .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client last beat: {}", instance.getIp(),
  38. instance.getPort(), instance.getCluster(), service.getName(), UtilsAndCommons.LOCALHOST_SITE,
  39. instance.getLastHeartBeatTime());
  40. NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service));
  41. NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(client));
  42. }
  43. }
  • 上面放了三个类的三个方法
  • 第一个就是我们的任务,执行doHealthCheck(),即健康检查,在第10行,对服务进行健康检查
  • 这里用了一个拦截器链(玩的有点花),不过实际执行的还是InstanceBeatCheckTask的passIntercept
  • 最终实际执行的还是 UnhealthyInstanceChecker 的 docheck方法,其实际逻辑是判断实例的健康状态,也就是**System.**_**currentTimeMillis() **_**- instance.getLastHeartBeatTime**_**() **_**> beatTimeout**根据最后一次心跳的间隔时间来进行的判断,如果不健康则修改实例的健康状态**changeHealthyStatus**,并且发送服务和客户端的变更事件

既然是检查最后一次心跳来校验健康状态,那么这里就回顾一下上一章心跳里更新实例的最后一次心跳时间的代码

  1. public class ClientBeatProcessorV2 implements BeatProcessor {
  2. // .....
  3. @Override
  4. public void run() {
  5. if (Loggers.EVT_LOG.isDebugEnabled()) {
  6. Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
  7. }
  8. String ip = rsInfo.getIp();
  9. int port = rsInfo.getPort();
  10. String serviceName = NamingUtils.getServiceName(rsInfo.getServiceName());
  11. String groupName = NamingUtils.getGroupName(rsInfo.getServiceName());
  12. Service service = Service.newService(namespace, groupName, serviceName, rsInfo.isEphemeral());
  13. HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) client.getInstancePublishInfo(service);
  14. if (instance.getIp().equals(ip) && instance.getPort() == port) {
  15. if (Loggers.EVT_LOG.isDebugEnabled()) {
  16. Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo);
  17. }
  18. instance.setLastHeartBeatTime(System.currentTimeMillis());
  19. if (!instance.isHealthy()) {
  20. instance.setHealthy(true);
  21. Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
  22. rsInfo.getServiceName(), ip, port, rsInfo.getCluster(), UtilsAndCommons.LOCALHOST_SITE);
  23. NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service));
  24. NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(client));
  25. }
  26. }
  27. }
  28. }
  • ClientBeatProcessorV2,也就是上一章心跳里面,发送心跳的时候,执行的异步任务,其在第18行修改的实例的心跳时间


略微总结一下

IpPortBasedClient是客户端在ClientManager保存的实体,里面有客户端信息,同时其被创建的初期(其创建是在客户端注册的时候去ClientManager找,如果没找到则通过clientConnected方法创建),会有一个周期性的异步5s的任务,根据最后一次心跳时间来检测实例的健康状态(心跳时间就是客户端上报心跳信息会进行更改)