本人看的源码版本是nacos2.0.3
不同版本的逻辑大致一致
客户端相关代码
还是从客户端注册的地方开始(注册的流程可以参考上一篇)
com.alibaba.nacos.client.naming.remote.http.NamingHttpClientProxy#registerService
@Overridepublic void registerService(String serviceName, String groupName, Instance instance) throws NacosException {// ...if (instance.isEphemeral()) {BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);beatReactor.addBeatInfo(groupedServiceName, beatInfo);}//...}
省略了服务注册的逻辑,直接看看心跳逻辑
在这里判断了当前的服务是否是 短暂的客户端(默认true),然后构建了一些心跳信息
public class BeatInfo {private int port;private String ip;private double weight;private String serviceName;private String cluster;private Map<String, String> metadata;private volatile boolean scheduled;private volatile long period;private volatile boolean stopped;// ignore...}
如当前实例的端口,IP,权重,服务名,等,需要注意的是period,以及stopped,这两个值的作用是跟心跳的延时以及控制心跳线程有关,后续代码会细化
**beatReactor.addBeatInfo**_**(**_**groupedServiceName, beatInfo**_**)**_向心跳反应堆添加心跳信息(源码稍后分析),后续的心跳的线程相关操作交由 BeatReactorBeatReactor
下面贴的代码有点多,不过会对每个方法都详细描述(下列代码都是BeatReactor内部的代码)
首先看看其构造器
public class BeatReactor implements Closeable {// ignore..private final ScheduledExecutorService executorService;public BeatReactor(NamingHttpClientProxy serverProxy, Properties properties) {this.serverProxy = serverProxy;// 获取合适的线程数int threadCount = initClientBeatThreadCount(properties);this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {// ThreadFactory 设置的这个线程为守护线程,意味着该线程不出意外是会一直伴随着实例发送心跳@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);// 设置为守护线程,在没有用户线程可服务时会自动离开thread.setDaemon(true);thread.setName("com.alibaba.nacos.naming.beat.sender");return thread;}});}// ignore..}
在其构造器中,初始化了一个线程池ScheduledExecutorService(可延迟后运行,或定期执行),其核心线程数为:
ThreadUtils._getSuitableThreadCount(_1_) _> 1 ? ThreadUtils._getSuitableThreadCount(_1_) _/ 2 : 1通过核数,计算出合适的线程数之和,如果>1,则取一半,不然则返回1- 同时,传入的线程工厂设置了其名称,以及其为守护线程,个人理解:线程池通过工厂创建出来的心跳线程均被标记为了守护线程,则说明当所有用户线程(或服务下线)退出的时候,即这个线程会被销毁(这里顺带提一下和后面代码相关的,心跳的时候是每5s都会通过线程池创建新的线程进行心跳包的发送,即发送完心跳就return了,这里标记为守护线程原因可能考虑到这5s内,如果服务down掉,能避免掉这次的心跳包的发送了,【个人理解】)
- 所以说,心跳反应堆BeatReactor在创建初期,主要是初始化了其心跳的线程池的大小,以及设置了其线程工厂
addBeatInfo
public class BeatReactor implements Closeable {// ignore ..private final ScheduledExecutorService executorService;public final Map<String, BeatInfo> dom2Beat = new ConcurrentHashMap<String, BeatInfo>();public void addBeatInfo(String serviceName, BeatInfo beatInfo) {NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);// 心跳 key serviceName#192.168.0.6#8080String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());BeatInfo existBeat;if ((existBeat = dom2Beat.remove(key)) != null) {existBeat.setStopped(true);}dom2Beat.put(key, beatInfo);// 添加细心跳任务executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());}}
- 这是在客户端注册的时候调用的方法,即生成心跳任务的地方
- key serviceName#192.168.0.6#8080 这里的key由 服务名#IP#端口 拼凑而成
- 向 dom2Beat 这个map添加了当前实例的信息 , 不过为什么用map来保存还不清楚,毕竟key对于当前客户端/服务而言 是唯一的
- 向executorService发送了一个心跳任务,以及任务延迟时间(volatile修饰),即实例注册的时候就准备延迟Period的毫秒的时间后开始执行心跳任务的线程
- MetricsMonitor这个玩意没用过,应该是性能指标监控之类的,有空再看
BeatTask !!!!!!
心跳任务
public class BeatReactor implements Closeable {// ignore../*** 发送心跳信息的线程*/class BeatTask implements Runnable {BeatInfo beatInfo;public BeatTask(BeatInfo beatInfo) {this.beatInfo = beatInfo;}@Overridepublic void run() {if (beatInfo.isStopped()) {return;}// 心跳的间隔时间long nextTime = beatInfo.getPeriod();try {// 发送心跳JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);long interval = result.get(CLIENT_BEAT_INTERVAL_FIELD).asLong();boolean lightBeatEnabled = false;if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();}BeatReactor.this.lightBeatEnabled = lightBeatEnabled;if (interval > 0) {nextTime = interval;}int code = NamingResponseCode.OK;if (result.has(CommonParams.CODE)) {code = result.get(CommonParams.CODE).asInt();}// 未找到请求的资源 则会重试注册if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {Instance instance = new Instance();instance.setPort(beatInfo.getPort());instance.setIp(beatInfo.getIp());instance.setWeight(beatInfo.getWeight());instance.setMetadata(beatInfo.getMetadata());instance.setClusterName(beatInfo.getCluster());instance.setServiceName(beatInfo.getServiceName());instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(true);try {serverProxy.registerService(beatInfo.getServiceName(),NamingUtils.getGroupName(beatInfo.getServiceName()), instance);} catch (Exception ignore) {}}} catch (NacosException ex) {NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());} catch (Exception unknownEx) {NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}",JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);} finally {// 再次提交一个检测心跳的线程任务 ! 时间为 nextTime = period// 因为每次都是使用的一个新的线程 通过beatInfo的volatile(保证线程可见)修饰的stopped// 来判断心跳是否需要终止executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);}}}
- 直接看BeatTask这个线程的run方法,首先会看到对BeatInfo的stopped字段的判断,检验当前心跳是否需要执行(该字段volatile修饰,
**private volatile boolean stopped;**因为这涉及到多个线程共享一个变量了) - 发送心跳
**serverProxy.sendBeat**_**(**_**beatInfo, BeatReactor.this.lightBeatEnabled**_**)**_**;**里面的代码待会会接着看,先梳理这个心跳的线程内容 - nextTime即心跳间隔时间,第一次是客户端指定或者默认的,之后会获取从服务端响应的心跳间隔时间
**CLIENT_BEAT_INTERVAL_FIELD**,服务端默认的其实也是5s,待会讲服务端的时候会贴 - 当客户端返回的code码为
**code == NamingResponseCode.**_**RESOURCE_NOT_FOUND**_(The requested resource is not found.)的时候,会重新服务注册的逻辑,**registerService** - 在finally代码块里,向线程池提交一个新的心跳任务!!!
- 简单的总结一下就是,客户端的心跳是通过线程池来发送心跳的,每次心跳发送完都会向线程池提交一个新的心跳任务
NamingHttpClientProxy
sendBeat
这里就是上面的BeatReactor的BeatTask调用发送心跳**sendBeat**的方法
/*** Send beat.*/public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());}Map<String, String> params = new HashMap<String, String>(16);Map<String, String> bodyMap = new HashMap<String, String>(2);if (!lightBeatEnabled) { // lightBeatEnabled = falsebodyMap.put("beat", JacksonUtils.toJson(beatInfo));}params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());params.put(IP_PARAM, beatInfo.getIp());params.put(PORT_PARAM, String.valueOf(beatInfo.getPort()));// /nacos/v1/ns/instance/beat -- PUT请求String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);return JacksonUtils.toObj(result);}
- 包装了一下当前实例的一些信息 beatInfo
- PUT 请求接口
**/nacos/v1/ns/instance/beat**
服务端相关代码
beat
这里是服务端处理心跳请求的controller
com.alibaba.nacos.naming.controllers.InstanceController#beat
/*** 客户端上报心跳信息的地方*/@CanDistro@PutMapping("/beat")@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)public ObjectNode beat(HttpServletRequest request) throws Exception {ObjectNode result = JacksonUtils.createEmptyJsonNode();result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);RsInfo clientBeat = null;if (StringUtils.isNotBlank(beat)) {clientBeat = JacksonUtils.toObj(beat, RsInfo.class);}String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));if (clientBeat != null) {if (StringUtils.isNotBlank(clientBeat.getCluster())) {clusterName = clientBeat.getCluster();} else {// fix #2533clientBeat.setCluster(clusterName);}ip = clientBeat.getIp();port = clientBeat.getPort();}String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}, namespaceId: {}", clientBeat,serviceName, namespaceId);BeatInfoInstanceBuilder builder = BeatInfoInstanceBuilder.newBuilder();// 对请求做一些操作 用于处理 1.x 客户端的某些指定请求builder.setRequest(request);// 处理心跳int resultCode = getInstanceOperator().handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat, builder);result.put(CommonParams.CODE, resultCode);result.put(SwitchEntry.CLIENT_BEAT_INTERVAL,getInstanceOperator().getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName));result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());return result;}
- 首先做的是根据传入的心跳信息,获取一些相关的参数
- 不过主要处理服务端接收到心跳逻辑的地方是第41行的handleBeat
- 同时返回了 CLIENT_BEAT_INTERVAL 即心跳间隔时间,回传给客户端的,对应了BeatTask里的获取nextTime
handleBeat
public int handleBeat(String namespaceId, String serviceName, String ip, int port, String cluster,RsInfo clientBeat, BeatInfoInstanceBuilder builder) throws NacosException {Service service = getService(namespaceId, serviceName, true);String clientId = IpPortBasedClient.getClientId(ip + InternetAddressUtil.IP_PORT_SPLITER + port, true);//拿到客户端的信息IpPortBasedClient client = (IpPortBasedClient) clientManager.getClient(clientId);if (null == client || !client.getAllPublishedService().contains(service)) {if (null == clientBeat) {return NamingResponseCode.RESOURCE_NOT_FOUND;}Instance instance = builder.setBeatInfo(clientBeat).setServiceName(serviceName).build();registerInstance(namespaceId, serviceName, instance);client = (IpPortBasedClient) clientManager.getClient(clientId);}if (!ServiceManager.getInstance().containSingleton(service)) {throw new NacosException(NacosException.SERVER_ERROR,"service not found: " + serviceName + "@" + namespaceId);}if (null == clientBeat) {clientBeat = new RsInfo();clientBeat.setIp(ip);clientBeat.setPort(port);clientBeat.setCluster(cluster);clientBeat.setServiceName(serviceName);}ClientBeatProcessorV2 beatProcessor = new ClientBeatProcessorV2(namespaceId, clientBeat, client);// 异步 立即执行HealthCheckReactor.scheduleNow(beatProcessor);client.setLastUpdatedTime();return NamingResponseCode.OK;}
- 从ClientManager里面获取到当前客户端信息 IpPortBasedClient
- 第一个if(第7行) 会判断当前客户端是否存在,不存在则响应 RESOURCE_NOT_FOUND(**The requested resource is not found.**),并且看当前客户端内的
**ConcurrentHashMap<Service, InstancePublishInfo> publishers**是否存在当前的服务,如果不存在,则直接调用注册实例的方法 - 第二个if(第15行)从ServiceManager查看当前服务是否存在,不存在则直接抛出异常
- 根据入参,创建了一个任务
**ClientBeatProcessorV2**,并且使用 ScheduledExecutorService 提交一个异步任务 - 总的来说,这块的代码是拿到当前客户端的信息,并且走了一个异步任务,接下来看看这个异步任务做了些什么?
ClientBeatProcessorV2 心跳的核心逻辑
com.alibaba.nacos.naming.healthcheck.heartbeat#ClientBeatProcessorV2
public class ClientBeatProcessorV2 implements BeatProcessor {private final String namespace;private final RsInfo rsInfo;private final IpPortBasedClient client;public ClientBeatProcessorV2(String namespace, RsInfo rsInfo, IpPortBasedClient ipPortBasedClient) {this.namespace = namespace;this.rsInfo = rsInfo;this.client = ipPortBasedClient;}@Overridepublic void run() {if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());}String ip = rsInfo.getIp();int port = rsInfo.getPort();String serviceName = NamingUtils.getServiceName(rsInfo.getServiceName());String groupName = NamingUtils.getGroupName(rsInfo.getServiceName());Service service = Service.newService(namespace, groupName, serviceName, rsInfo.isEphemeral());HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) client.getInstancePublishInfo(service);// 根据之前保存的IP和端口 校验一下还对不对 按道理来说不存在不对的情况吧if (instance.getIp().equals(ip) && instance.getPort() == port) {if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo);}// 设置实例最后一次的心跳检测的时间,并且将状态设置成为健康instance.setLastHeartBeatTime(System.currentTimeMillis());if (!instance.isHealthy()) {instance.setHealthy(true);Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",rsInfo.getServiceName(), ip, port, rsInfo.getCluster(), UtilsAndCommons.LOCALHOST_SITE);NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service));NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(client));}}}}
- 这是上面通过线程池调用的异步线程
- 先获取到当前实例信息 HealthCheckInstancePublishInfo ,校验当前的IP和端口是否正确,不过一般情况下应该都是true的吧,什么情况会出现客户端上报的IP和端口端的信息不对等呢….
- 接着设置了最后一次心跳时间,同时如果当前实例如果不是healthy状态,则设置成true,最后通过NotifyCenter发布了两个事件,服务修改事件和客户端修改事件
UnhealthyInstanceChecker 检测健康状态
public class UnhealthyInstanceChecker implements InstanceBeatChecker {@Overridepublic void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {if (instance.isHealthy() && isUnhealthy(service, instance)) {changeHealthyStatus(client, service, instance);}}}
- 不健康的实例检测,毕竟ClientBeatProcessorV2是设置成健康,可没什么能够设置成不健康的地方… 这个类其实具体应该放在Client的地方来描述,但是毕竟是讲心跳,所以得闭环一下…
- 其实,每一个客户端(
**IpPortBasedClient**)都会开一个周期性的5s的任务,来检测最后一次心跳时间是否正常(**isUnhealthy**这个方法,**System.**_**currentTimeMillis****() **_**- instance.getLastHeartBeatTime**_**() **_**> beatTimeout**),如果不健康(则会根据最后一次心跳时间来判断,**ClientBeatProcessorV2**这个异步任务修改的时间),则会将实例信息修改为不健康 - 当然这个线程如何开始工作的,会在后面的Client相关的文章里描述
到此心跳的整个闭环大致是这样,当然,这是我的理解…
