一、Nacos Client
流程简介
构建Instance实例,将其封装成Http请求,然后发送至Server进行注册。通过BeatReactor,定时向Server上报心跳信息。
设计要点
1、BeatInfo
如果是临时实例,则不会在Nacos服务端持久化存储,需要通过上报心跳的方式进行保活,如果一段时间内没有上报心跳,则会被Nacos服务端摘除。在被摘除后如果又开始上报心跳,则会重新将这个实例注册。持久化实例则会持久化到Nacos服务端,此时即使注册实例的客户端进程不在,这个实例也不会从服务端删除,只会将健康状态设为不健康。
2、getServerList
private List<String> getServerList() {List<String> snapshot = serversFromEndpoint;if (!CollectionUtils.isEmpty(serverList)) {snapshot = serverList;}return snapshot;}
这里其实涉及到Nacos集群的概念了,serversFromEndpoint其实是向Nacos Server获取当前Nacos集群中的server列表,但是Nacos中用户设置的Nacos Server Addr的优先级是大于Nacos Client端去远程获取到的server列表的。
3、Nacos Server的选取
if (servers != null && !servers.isEmpty()) {Random random = new Random(System.currentTimeMillis());int index = random.nextInt(servers.size());for (int i = 0; i < servers.size(); i++) {String server = servers.get(index);try {return callServer(api, params, body, server, method);} catch (NacosException e) {exception = e;if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("request {} failed.", server, e);}}index = (index + 1) % servers.size();}}
如果Nacos Serve是集群模式部署的话,那么会采用随机策略选择一个Nacos Server Addr作为进行Instance注册的Http请求地址;如果请求失败的话则再次重新选取一个Nacos Server。
二、Nacos Server
流程简介
完成实例存入Service,同时会触发两个事件。其中一个事件是用于数据同步的,Nacos服务端会根据这个服务是否是临时对象的信息,使用Distro或者Raft协议进行同步,通知其他的Nacos节点该服务发生了变更;另一个事件则通知在该Nacos服务节点上订阅了该服务的订阅者,并根据订阅者信息,通过UDP的方式,把最新的服务列表推送到订阅者客户端上,这就完成了一次服务注册流程。
设计要点
由分级存储模型可知,健康检查、同步机制集中在Cluster层。
1、CanDistro**
实例注册时,DistroFilter会进行请求拦截,以判断当前服务是否由当前节点负责处理。如果不是则转发到相应的节点。具体判断逻辑由 DistroMapper 负责。
/*** Judge whether current server is responsible for input service.** @param serviceName service name* @return true if input service is response, otherwise false*/public boolean responsible(String serviceName) {final List<String> servers = healthyList;if (!switchDomain.isDistroEnabled() || ApplicationUtils.getStandaloneMode()) {return true;}if (CollectionUtils.isEmpty(servers)) {// means distro config is not ready yetreturn false;}int index = servers.indexOf(ApplicationUtils.getLocalAddress());int lastIndex = servers.lastIndexOf(ApplicationUtils.getLocalAddress());if (lastIndex < 0 || index < 0) {return true;}int target = distroHash(serviceName) % servers.size();// 类hash一致性算法return target >= index && target <= lastIndex;}
2、ClientBeatCheckTask
检查并更新客户端临时实例的状态,如果过期的话则从实例列表中移除,同时发布事件事件:
- 服务变更 ServiceChangeEvent
实例超时 InstanceHeartbeatTimeoutEvent
@Overridepublic void run() {try {if (!getDistroMapper().responsible(service.getName())) {return;}if (!getSwitchDomain().isHealthCheckEnabled()) {return;}List<Instance> instances = service.allIPs(true);// 标记:first set health status of instances:for (Instance instance : instances) {if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {if (!instance.isMarked()) {if (instance.isHealthy()) {instance.setHealthy(false);Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",instance.getIp(), instance.getPort(), instance.getClusterName(),service.getName(), UtilsAndCommons.LOCALHOST_SITE,instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());// 发布事件getPushService().serviceChanged(service);ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}if (!getGlobalConfig().isExpireInstance()) {return;}// 清除:then remove obsolete instances:for (Instance instance : instances) {if (instance.isMarked()) {continue;}if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// delete instanceLoggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),JacksonUtils.toJson(instance));deleteIp(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}}
3、HealthCheckTask
目前只有持久化的实例才可以选定健康检查方式,非持久化的实例只会使用心跳进行健康检查。
4、Notifier
消息通知器。
5、PushService
消息推送器,通过UDP的方式把最新的服务列表推送到订阅者客户端上。private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {if (ackEntry == null) {Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");return null;}if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);ackMap.remove(ackEntry.key);udpSendTimeMap.remove(ackEntry.key);failedPush += 1;return ackEntry;}try {if (!ackMap.containsKey(ackEntry.key)) {totalPush++;}ackMap.put(ackEntry.key, ackEntry);udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());Loggers.PUSH.info("send udp packet: " + ackEntry.key);// 通过UDP向Subscriber推送服务实例信息udpSocket.send(ackEntry.origin);ackEntry.increaseRetryTime();GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);return ackEntry;} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,ackEntry.origin.getAddress().getHostAddress(), e);ackMap.remove(ackEntry.key);udpSendTimeMap.remove(ackEntry.key);failedPush += 1;return null;}}
6、DistroDelayTaskExecuteEngine
Nacos Server 节点数据同步器
