细化分为两个步骤,一个是客户端,一个是服务端

客户端

代码地址

如果你拉了nacos的源码(本人是拿的2.0.3的版本)的话,客户端发起服务注册代码地址如下:
com.alibaba.nacos.client.naming.remote.http.NamingHttpClientProxy#registerService

注册代码剖析

  1. @Override
  2. public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
  3. NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
  4. instance);
  5. String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
  6. if (instance.isEphemeral()) {
  7. BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
  8. beatReactor.addBeatInfo(groupedServiceName, beatInfo);
  9. }
  10. final Map<String, String> params = new HashMap<String, String>(32);
  11. params.put(CommonParams.NAMESPACE_ID, namespaceId);
  12. params.put(CommonParams.SERVICE_NAME, groupedServiceName);
  13. params.put(CommonParams.GROUP_NAME, groupName);
  14. params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
  15. params.put(IP_PARAM, instance.getIp());
  16. params.put(PORT_PARAM, String.valueOf(instance.getPort()));
  17. params.put(WEIGHT_PARAM, String.valueOf(instance.getWeight()));
  18. // 2.0.3的客户端代码里面这里写的不规范有点,因为上面是已经定义过了的
  19. params.put("enable", String.valueOf(instance.isEnabled()));
  20. params.put(HEALTHY_PARAM, String.valueOf(instance.isHealthy()));
  21. params.put(EPHEMERAL_PARAM, String.valueOf(instance.isEphemeral()));
  22. params.put(META_PARAM, JacksonUtils.toJson(instance.getMetadata()));
  23. reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
  24. }
  • groupedServiceName = groupName + Constants.SERVICE_INFO_SPLITER + serviceName
  • **instance.isEphemeral()**判断当前实例是临时实例还是持久实例,其默认值是private boolean ephemeral = true;即临时实例,如果是临时实例的话,就会添加其心跳信息

    1. public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
    2. BeatInfo beatInfo = new BeatInfo();
    3. beatInfo.setServiceName(groupedServiceName);
    4. beatInfo.setIp(instance.getIp());
    5. beatInfo.setPort(instance.getPort());
    6. beatInfo.setCluster(instance.getClusterName());
    7. beatInfo.setWeight(instance.getWeight());
    8. beatInfo.setMetadata(instance.getMetadata());
    9. beatInfo.setScheduled(false);
    10. beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
    11. return beatInfo;
    12. }
  • 我们可以看到这里设置的该实例的心跳信息,**period**就是比较关心的心跳间隔时间,其默认的时间是 _**DEFAULT_HEART_BEAT_INTERVAL **_**= TimeUnit.**_**SECONDS**_**.toMillis**_**(**_**5**_**)**_,即默认5s为其心跳间隔时间,心跳信息是保存在客户端的BeatReactor里面,当然,心跳的代码就不在这里梳理了,后续会出心跳的代码分析

  • 再看看上面的代码片段,放入了实例的一些相关信息,如服务名,IP,端口,权重等,这些也是对于一个服务注册中心来说会比较重要的
  • 请求地址 **UtilAndComs.**_**nacosUrlInstance**_**/nacos/v1/ns/instance**,即向nacos的服务端发送一个POST请求


服务端

代码地址

com.alibaba.nacos.naming.controllers#register

服务端代码剖析

register

  1. /**
  2. * Register new instance.
  3. * 服务注册--->client 端 post请求 nacos/v1/ns/instance
  4. * client : {@link com.alibaba.nacos.client.naming.remote.http.NamingHttpClientProxy#registerService}
  5. * @param request http request
  6. * @return 'ok' if success
  7. * @throws Exception any error during register
  8. */
  9. @CanDistro
  10. @PostMapping
  11. @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
  12. public String register(HttpServletRequest request) throws Exception {
  13. final String namespaceId = WebUtils
  14. .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
  15. final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
  16. NamingUtils.checkServiceNameFormat(serviceName);
  17. final Instance instance = HttpRequestInstanceBuilder.newBuilder()
  18. .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
  19. // InstanceOperatorClientImpl
  20. getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
  21. return "ok";
  22. }
  • 这里打印一下instancetoString,能更加直观的看看实例信息: Instance{instanceId=’192.168.1.10#10001#DEFAULT#DEFAULT_GROUP@@server1’,ip=’192.168.1.10’,port=10001,weight=1.0,healthy=true,enabled=true,ephemeral=true,clusterName=’DEFAULT’,serviceName=’DEFAULT_GROUP@@server1’,metadata={preserved.register.source=SPRING_CLOUD}}

    1. 懒得排版了,就这样看吧TAT
  • 上面大部分代码都是格式化了一下服务相关的信息,主要的注册是走的InstanceOperatorClientImpl的registerInstance方法,跟踪源码后,服务注册的核心代码:**EphemeralClientOperationServiceImpl#registerInstance** 接下来贴一下代码:

    1. /**
    2. * This method creates {@code IpPortBasedClient} if it don't exist.
    3. */
    4. @Override
    5. public void registerInstance(String namespaceId, String serviceName, Instance instance) {
    6. boolean ephemeral = instance.isEphemeral();
    7. String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
    8. createIpPortClientIfAbsent(clientId);
    9. Service service = getService(namespaceId, serviceName, ephemeral);
    10. clientOperationService.registerInstance(service, instance, clientId);
    11. }
    12. @Override
    13. public void registerInstance(Service service, Instance instance, String clientId) {
    14. Service singleton = ServiceManager.getInstance().getSingleton(service);
    15. Client client = clientManager.getClient(clientId);
    16. if (!clientIsLegal(client, clientId)) {
    17. return;
    18. }
    19. InstancePublishInfo instanceInfo = getPublishInfo(instance);
    20. client.addServiceInstance(singleton, instanceInfo);
    21. client.setLastUpdatedTime();
    22. // 发布事件
    23. // 客户端注册事件
    24. NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
    25. // 实例元数据事件
    26. NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
    27. }
  • 上面的第一个**registerInstance**方法,实际上是创建了一个client(如果这个client是第一次注册上来,则创建,如果不是则不做处理,如 192.168.0.6:8080#true,这里的true指短暂的客户端,第一次注册的话,则会生成一个新的client), **createIpPortClientIfAbsent(clientId)**,以及心跳相关信息,这个会在后续文章中详解,也就是在这里,向下文中的ClientManager的clients这个map中放入的客户端的信息等

  • 在这里,有几个可能会感兴趣的东西,ServiceManager,Client,InstancePublishInfo,NotifyCenter 所以,实列注册的时候实际做的事情都在这几个里面了

    ServiceManager

    1. public class ServiceManager {
    2. private static final ServiceManager INSTANCE = new ServiceManager();
    3. private final ConcurrentHashMap<Service, Service> singletonRepository;
    4. private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
    5. //...
    6. }
  • 首先很明确的点就是,ServiceManager是单例的,这点在上面的**getInstance()**表现的很明确

  • 两个ConcurrentHashMap,分别保存单例的服务,和命名空间下的服务集合

image.png

  • 贴下debug的图片,客户端我写了两个服务,server1server2,在singletonRepository中保存了这两个服务的信息,所以size为2,同时namespaceSingletonMaps保存了这两个服务,server1和server2,他们均在public的这个命名空间下,即public->(server1,server2),所以,说明了singletonRepository仅保存服务这个维度,namespaceSingletonMaps保存的是该命名空间下的所有的服务
  • 注意**Service{namespace='public', group='DEFAULT_GROUP', name='server1', ephemeral=true, revision=0}**这里的Service里面是的属性是没有端口,ip等属性的,意味着这里的Service是指服务(server.name指定的那个值),就算有几个server1,这里的Service都是泛指的server1

Client

意思就是客户端,接下来看看getClient做了啥 这里对Client的描述有点乱,后面的文章会对Client这块做详细的描述
image.png

  • 可以看到 clientId是IP:端口+是否是短暂 的形式的字符串
  • 当然这里提一嘴,本人debug过程中,虽然只启动了一个服务,但是会存在192.168.1.6:6xxxx的clientId进来,~~暂时我理解是客户端一些其他端口在做心跳等工作,先暂时这么理解~ ~~后来看的时候发现这个 好像是1.x的客户端有个线程在调用/list接口拿服务列表,这个时候也会被当做一个客户端
  • getClientManagerById(clientId)实际做的事情是,根据后缀,即判断当前服务是短暂还是持久的,来获取相应的ClientManager 客户端管理器,短暂的ClientManager所对应的就是EphemeralIpPortClientManager,如下:

    1. @Component("ephemeralIpPortClientManager")
    2. public class EphemeralIpPortClientManager implements ClientManager {
    3. private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();
    4. }
  • 上面已经描述了client是在什么时候注册进clients这个map的,就不多做阐述了。这里定义了一个ConcurrentMap,key就是我们的服务的IP:端口+true,value保存的是客户端的IP,端口等信息,里面还有一些关于客户端心跳的一些任务,暂时就不往里面深挖了~ 不过可以扒一点这块的源码,感兴趣的话可以看看,不然就跳过到下一截 ```java @Component(“ephemeralIpPortClientManager”) public class EphemeralIpPortClientManager implements ClientManager {
    private final ConcurrentMap clients = new ConcurrentHashMap<>();

    @Override public boolean clientConnected(final Client client) {

    1. clients.computeIfAbsent(client.getClientId(), s -> {
    2. Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());
    3. IpPortBasedClient ipPortBasedClient = (IpPortBasedClient) client;
    4. ipPortBasedClient.init();
    5. return ipPortBasedClient;
    6. });
    7. return true;

    }

    /**

    • Init client. */ public void init() { if (ephemeral) {
      1. beatCheckTask = new ClientBeatCheckTaskV2(this);
      2. HealthCheckReactor.scheduleCheck(beatCheckTask);
      } else {
      1. healthCheckTaskV2 = new HealthCheckTaskV2(this);
      2. HealthCheckReactor.scheduleCheck(healthCheckTaskV2);
      } } }

/**

  • Schedule client beat check task with a delay. *
  • @param task client beat check task */ public class HealthCheckReactor { private static Map futureMap = new ConcurrentHashMap<>();

    public static void scheduleCheck(BeatCheckTask task) {

    1. Runnable wrapperTask =
    2. task instanceof NacosHealthCheckTask ? new HealthCheckTaskInterceptWrapper((NacosHealthCheckTask) task)
    3. : task;
    4. futureMap.computeIfAbsent(task.taskKey(),
    5. k -> GlobalExecutor.scheduleNamingHealth(wrapperTask, 5000, 5000, TimeUnit.MILLISECONDS));

    } }

/**

  • Instance beat checker for expired instance. */ public class ExpiredInstanceChecker implements InstanceBeatChecker { @Override public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {
    1. boolean expireInstance = ApplicationUtils.getBean(GlobalConfig.class).isExpireInstance();
    2. if (expireInstance && isExpireInstance(service, instance)) {
    3. deleteIp(client, service, instance);
    4. }
    } } ```
  • **clientConnected**这个方法是向**clients**这个存储客户端的map中添加client的,同时我们能够看到初始化client的同时,执行了其内部的**init()**方法,当客户端是短暂的时候,向**HealthCheckReactor.scheduleCheck(beatCheckTask)**传入了一个**ClientBeatCheckTaskV2**我的理解是向健康检查的反应堆传入了一个客户端心跳检查的任务,跟踪源码后,可以看到 **ExpiredInstanceChecker**就是实际的节拍检测的落地,isExpireInstance的实际值等于**System.**_**currentTimeMillis() **_**- instance.getLastHeartBeatTime**_**() **_**> deleteTimeout**即当前时间减去实例的最后一次心跳时间是否大于超时时间,当然,关于心跳的这部分源码,后续有机会会单独邻出来扒拉

InstancePublishInfo

贴下获取InstancePublishInfo的代码,瞅两眼就差不多了

  1. /**
  2. * get publish info.
  3. */
  4. default InstancePublishInfo getPublishInfo(Instance instance) {
  5. InstancePublishInfo result = new InstancePublishInfo(instance.getIp(), instance.getPort());
  6. if (null != instance.getMetadata() && !instance.getMetadata().isEmpty()) {
  7. result.getExtendDatum().putAll(instance.getMetadata());
  8. }
  9. if (StringUtils.isNotEmpty(instance.getInstanceId())) {
  10. result.getExtendDatum().put(Constants.CUSTOM_INSTANCE_ID, instance.getInstanceId());
  11. }
  12. if (Constants.DEFAULT_INSTANCE_WEIGHT != instance.getWeight()) {
  13. result.getExtendDatum().put(Constants.PUBLISH_INSTANCE_WEIGHT, instance.getWeight());
  14. }
  15. if (!instance.isEnabled()) {
  16. result.getExtendDatum().put(Constants.PUBLISH_INSTANCE_ENABLE, instance.isEnabled());
  17. }
  18. String clusterName = StringUtils.isBlank(instance.getClusterName()) ? UtilsAndCommons.DEFAULT_CLUSTER_NAME
  19. : instance.getClusterName();
  20. result.setHealthy(instance.isHealthy());
  21. result.setCluster(clusterName);
  22. return result;
  23. }

image.png

  • 其实里面没啥,_**Instance POJO of client published for Nacos v2.**_,准确的说,这个保存了实例的信息,对于2.0+的版本而言
  • 拿到InstancePublishInfo信息后,client会根据这些信息做些处理,代码如下 ```java // 这里贴下上面的代码,即获取到InstancePublishInfo的地方 public void registerInstance(Service service, Instance instance, String clientId) {
    1. //...
    2. InstancePublishInfo instanceInfo = getPublishInfo(instance);
    3. client.addServiceInstance(singleton, instanceInfo);
    4. //...
    }

public abstract class AbstractClient implements Client { @Override public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) { if (null == publishers.put(service, instancePublishInfo)) { MetricsMonitor.incrementInstanceCount(); } NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this)); Loggers.SRV_LOG.info(“Client change for service {}, {}”, service, getClientId()); return true; } }

  1. - `**MetricsMonitor.incrementInstanceCount()**`MetricsMonitor,源码文档给出的解释是_**指标监视器**,它是一个单例的,全局的,里面有很多_**AtomicIntegerAtomicLong定义的变量,其中就有**`**private final AtomicInteger ipCount = new AtomicInteger**_**()**_**;**`说明此时会告诉_**指标监视器**_**,客户端的ip数+1了!**
  2. - 同时,发布了一个`**ClientEvent.ClientChangedEvent**`即客户端变化事件
  3. 这就是拿到**InstancePublishInfo**后做的相关操作<br />
  4. <a name="zSNiL"></a>
  5. #### NotifyCenter
  6. ```java
  7. NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
  8. NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
  • 通过事件发布中心,发布了两个事件,**ClientRegisterServiceEvent**以及**InstanceMetadataEvent**,关于NotifyCenter的具体细节可以参考之前的文章,这里不过多阐述了,如果你看了上章节的事件通知,那么就能够串起来这个过程了,这里不做太多的阐述,简单的来说:这里通过一个单例NotifyCenter,向这个事件所对应的发布者添加消息,接着该发布者(线程)会不断的将消息广播给订阅了这个事件的订阅者们,由此,就能够让其订阅者去处理这个事件所需要做的相关操作。


流程总结~

至此,客户端注册的流程已经完结,接下来再从宏观的角度过一下流程

扣一扣服务注册的流程 - 图4

  • 客户端通过post请求,请求服务端的nacos,地址是**nacos/v1/ns/instance**,包装了当前服务的一些实例信息,如IP,端口,服务名等
  • Nacos服务端通过ClientManager保存客户端维度的信息,ServiceManager保存服务维度的信息,这里的客户端维度,你可以理解为是微服务集群中的一个服务,如订单微服务中的其中一个服务(可以理解成机器,或者pod),而服务维度呢,就是泛指你的这一组微服务,如订单服务,这么一说,即可联想到,服务对于客户端而言,是1对多的关系,每个客户端也会维持其心跳相关的信息,这个会在后续心跳相关源码梳理
  • NotifyCenter和MetricsMonitor均为单例,MetricsMonitor是Nacos的性能指标监控器,保存nacos的全局监控信息NotifyCenter是事件通知,会将事件通过发布者通知其订阅者,交给他们处理