本人看的源码版本是nacos2.0.3
不同版本的逻辑大致一致

客户端相关代码

还是从客户端注册的地方开始(注册的流程可以参考上一篇)
com.alibaba.nacos.client.naming.remote.http.NamingHttpClientProxy#registerService

  1. @Override
  2. public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
  3. // ...
  4. if (instance.isEphemeral()) {
  5. BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
  6. beatReactor.addBeatInfo(groupedServiceName, beatInfo);
  7. }
  8. //...
  9. }

省略了服务注册的逻辑,直接看看心跳逻辑

  • 在这里判断了当前的服务是否是 短暂的客户端(默认true),然后构建了一些心跳信息

    1. public class BeatInfo {
    2. private int port;
    3. private String ip;
    4. private double weight;
    5. private String serviceName;
    6. private String cluster;
    7. private Map<String, String> metadata;
    8. private volatile boolean scheduled;
    9. private volatile long period;
    10. private volatile boolean stopped;
    11. // ignore...
    12. }
  • 如当前实例的端口,IP,权重,服务名,等,需要注意的是period,以及stopped,这两个值的作用是跟心跳的延时以及控制心跳线程有关,后续代码会细化

  • **beatReactor.addBeatInfo**_**(**_**groupedServiceName, beatInfo**_**)**_ 向心跳反应堆添加心跳信息(源码稍后分析),后续的心跳的线程相关操作交由 BeatReactor

    BeatReactor

    下面贴的代码有点多,不过会对每个方法都详细描述(下列代码都是BeatReactor内部的代码

    首先看看其构造器

    1. public class BeatReactor implements Closeable {
    2. // ignore..
    3. private final ScheduledExecutorService executorService;
    4. public BeatReactor(NamingHttpClientProxy serverProxy, Properties properties) {
    5. this.serverProxy = serverProxy;
    6. // 获取合适的线程数
    7. int threadCount = initClientBeatThreadCount(properties);
    8. this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
    9. // ThreadFactory 设置的这个线程为守护线程,意味着该线程不出意外是会一直伴随着实例发送心跳
    10. @Override
    11. public Thread newThread(Runnable r) {
    12. Thread thread = new Thread(r);
    13. // 设置为守护线程,在没有用户线程可服务时会自动离开
    14. thread.setDaemon(true);
    15. thread.setName("com.alibaba.nacos.naming.beat.sender");
    16. return thread;
    17. }
    18. });
    19. }
    20. // ignore..
    21. }
  • 在其构造器中,初始化了一个线程池ScheduledExecutorService(可延迟后运行,或定期执行),其核心线程数为:ThreadUtils._getSuitableThreadCount(_1_) _> 1 ? ThreadUtils._getSuitableThreadCount(_1_) _/ 2 : 1 通过核数,计算出合适的线程数之和,如果>1,则取一半,不然则返回1

  • 同时,传入的线程工厂设置了其名称,以及其为守护线程,个人理解:线程池通过工厂创建出来的心跳线程均被标记为了守护线程,则说明当所有用户线程(或服务下线)退出的时候,即这个线程会被销毁(这里顺带提一下和后面代码相关的,心跳的时候是每5s都会通过线程池创建新的线程进行心跳包的发送,即发送完心跳就return了,这里标记为守护线程原因可能考虑到这5s内,如果服务down掉,能避免掉这次的心跳包的发送了,【个人理解】)
  • 所以说,心跳反应堆BeatReactor在创建初期,主要是初始化了其心跳的线程池的大小,以及设置了其线程工厂

addBeatInfo

  1. public class BeatReactor implements Closeable {
  2. // ignore ..
  3. private final ScheduledExecutorService executorService;
  4. public final Map<String, BeatInfo> dom2Beat = new ConcurrentHashMap<String, BeatInfo>();
  5. public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
  6. NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
  7. // 心跳 key serviceName#192.168.0.6#8080
  8. String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
  9. BeatInfo existBeat;
  10. if ((existBeat = dom2Beat.remove(key)) != null) {
  11. existBeat.setStopped(true);
  12. }
  13. dom2Beat.put(key, beatInfo);
  14. // 添加细心跳任务
  15. executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
  16. MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
  17. }
  18. }
  • 这是在客户端注册的时候调用的方法,即生成心跳任务的地方
  • key serviceName#192.168.0.6#8080 这里的key由 服务名#IP#端口 拼凑而成
  • 向 dom2Beat 这个map添加了当前实例的信息 , 不过为什么用map来保存还不清楚,毕竟key对于当前客户端/服务而言 是唯一的
  • 向executorService发送了一个心跳任务,以及任务延迟时间(volatile修饰),即实例注册的时候就准备延迟Period的毫秒的时间后开始执行心跳任务的线程
  • MetricsMonitor这个玩意没用过,应该是性能指标监控之类的,有空再看


BeatTask !!!!!!

心跳任务

  1. public class BeatReactor implements Closeable {
  2. // ignore..
  3. /**
  4. * 发送心跳信息的线程
  5. */
  6. class BeatTask implements Runnable {
  7. BeatInfo beatInfo;
  8. public BeatTask(BeatInfo beatInfo) {
  9. this.beatInfo = beatInfo;
  10. }
  11. @Override
  12. public void run() {
  13. if (beatInfo.isStopped()) {
  14. return;
  15. }
  16. // 心跳的间隔时间
  17. long nextTime = beatInfo.getPeriod();
  18. try {
  19. // 发送心跳
  20. JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
  21. long interval = result.get(CLIENT_BEAT_INTERVAL_FIELD).asLong();
  22. boolean lightBeatEnabled = false;
  23. if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
  24. lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
  25. }
  26. BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
  27. if (interval > 0) {
  28. nextTime = interval;
  29. }
  30. int code = NamingResponseCode.OK;
  31. if (result.has(CommonParams.CODE)) {
  32. code = result.get(CommonParams.CODE).asInt();
  33. }
  34. // 未找到请求的资源 则会重试注册
  35. if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
  36. Instance instance = new Instance();
  37. instance.setPort(beatInfo.getPort());
  38. instance.setIp(beatInfo.getIp());
  39. instance.setWeight(beatInfo.getWeight());
  40. instance.setMetadata(beatInfo.getMetadata());
  41. instance.setClusterName(beatInfo.getCluster());
  42. instance.setServiceName(beatInfo.getServiceName());
  43. instance.setInstanceId(instance.getInstanceId());
  44. instance.setEphemeral(true);
  45. try {
  46. serverProxy.registerService(beatInfo.getServiceName(),
  47. NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
  48. } catch (Exception ignore) {
  49. }
  50. }
  51. } catch (NacosException ex) {
  52. NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
  53. JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
  54. } catch (Exception unknownEx) {
  55. NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}",
  56. JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);
  57. } finally {
  58. // 再次提交一个检测心跳的线程任务 ! 时间为 nextTime = period
  59. // 因为每次都是使用的一个新的线程 通过beatInfo的volatile(保证线程可见)修饰的stopped
  60. // 来判断心跳是否需要终止
  61. executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
  62. }
  63. }
  64. }
  • 直接看BeatTask这个线程的run方法,首先会看到对BeatInfo的stopped字段的判断,检验当前心跳是否需要执行(该字段volatile修饰,**private volatile boolean stopped;**因为这涉及到多个线程共享一个变量了)
  • 发送心跳**serverProxy.sendBeat**_**(**_**beatInfo, BeatReactor.this.lightBeatEnabled**_**)**_**;** 里面的代码待会会接着看,先梳理这个心跳的线程内容
  • nextTime即心跳间隔时间第一次是客户端指定或者默认的,之后会获取从服务端响应的心跳间隔时间**CLIENT_BEAT_INTERVAL_FIELD**,服务端默认的其实也是5s,待会讲服务端的时候会贴
  • 当客户端返回的code码为 **code == NamingResponseCode.**_**RESOURCE_NOT_FOUND**_The requested resource is not found.)的时候,会重新服务注册的逻辑,**registerService**
  • 在finally代码块里,向线程池提交一个新的心跳任务!!!
  • 简单的总结一下就是,客户端的心跳是通过线程池来发送心跳的,每次心跳发送完都会向线程池提交一个新的心跳任务

NamingHttpClientProxy

sendBeat

这里就是上面的BeatReactor的BeatTask调用发送心跳**sendBeat**的方法

  1. /**
  2. * Send beat.
  3. */
  4. public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
  5. if (NAMING_LOGGER.isDebugEnabled()) {
  6. NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
  7. }
  8. Map<String, String> params = new HashMap<String, String>(16);
  9. Map<String, String> bodyMap = new HashMap<String, String>(2);
  10. if (!lightBeatEnabled) { // lightBeatEnabled = false
  11. bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
  12. }
  13. params.put(CommonParams.NAMESPACE_ID, namespaceId);
  14. params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
  15. params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
  16. params.put(IP_PARAM, beatInfo.getIp());
  17. params.put(PORT_PARAM, String.valueOf(beatInfo.getPort()));
  18. // /nacos/v1/ns/instance/beat -- PUT请求
  19. String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
  20. return JacksonUtils.toObj(result);
  21. }
  • 包装了一下当前实例的一些信息 beatInfo
  • PUT 请求接口**/nacos/v1/ns/instance/beat**


服务端相关代码

beat

这里是服务端处理心跳请求的controller
com.alibaba.nacos.naming.controllers.InstanceController#beat

  1. /**
  2. * 客户端上报心跳信息的地方
  3. */
  4. @CanDistro
  5. @PutMapping("/beat")
  6. @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
  7. public ObjectNode beat(HttpServletRequest request) throws Exception {
  8. ObjectNode result = JacksonUtils.createEmptyJsonNode();
  9. result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
  10. String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
  11. RsInfo clientBeat = null;
  12. if (StringUtils.isNotBlank(beat)) {
  13. clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
  14. }
  15. String clusterName = WebUtils
  16. .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
  17. String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
  18. int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
  19. if (clientBeat != null) {
  20. if (StringUtils.isNotBlank(clientBeat.getCluster())) {
  21. clusterName = clientBeat.getCluster();
  22. } else {
  23. // fix #2533
  24. clientBeat.setCluster(clusterName);
  25. }
  26. ip = clientBeat.getIp();
  27. port = clientBeat.getPort();
  28. }
  29. String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
  30. String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
  31. NamingUtils.checkServiceNameFormat(serviceName);
  32. Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}, namespaceId: {}", clientBeat,
  33. serviceName, namespaceId);
  34. BeatInfoInstanceBuilder builder = BeatInfoInstanceBuilder.newBuilder();
  35. // 对请求做一些操作 用于处理 1.x 客户端的某些指定请求
  36. builder.setRequest(request);
  37. // 处理心跳
  38. int resultCode = getInstanceOperator()
  39. .handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat, builder);
  40. result.put(CommonParams.CODE, resultCode);
  41. result.put(SwitchEntry.CLIENT_BEAT_INTERVAL,
  42. getInstanceOperator().getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName));
  43. result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
  44. return result;
  45. }
  • 首先做的是根据传入的心跳信息,获取一些相关的参数
  • 不过主要处理服务端接收到心跳逻辑的地方是第41行的handleBeat
  • 同时返回了 CLIENT_BEAT_INTERVAL 即心跳间隔时间,回传给客户端的,对应了BeatTask里的获取nextTime

handleBeat

  1. public int handleBeat(String namespaceId, String serviceName, String ip, int port, String cluster,
  2. RsInfo clientBeat, BeatInfoInstanceBuilder builder) throws NacosException {
  3. Service service = getService(namespaceId, serviceName, true);
  4. String clientId = IpPortBasedClient.getClientId(ip + InternetAddressUtil.IP_PORT_SPLITER + port, true);
  5. //拿到客户端的信息
  6. IpPortBasedClient client = (IpPortBasedClient) clientManager.getClient(clientId);
  7. if (null == client || !client.getAllPublishedService().contains(service)) {
  8. if (null == clientBeat) {
  9. return NamingResponseCode.RESOURCE_NOT_FOUND;
  10. }
  11. Instance instance = builder.setBeatInfo(clientBeat).setServiceName(serviceName).build();
  12. registerInstance(namespaceId, serviceName, instance);
  13. client = (IpPortBasedClient) clientManager.getClient(clientId);
  14. }
  15. if (!ServiceManager.getInstance().containSingleton(service)) {
  16. throw new NacosException(NacosException.SERVER_ERROR,
  17. "service not found: " + serviceName + "@" + namespaceId);
  18. }
  19. if (null == clientBeat) {
  20. clientBeat = new RsInfo();
  21. clientBeat.setIp(ip);
  22. clientBeat.setPort(port);
  23. clientBeat.setCluster(cluster);
  24. clientBeat.setServiceName(serviceName);
  25. }
  26. ClientBeatProcessorV2 beatProcessor = new ClientBeatProcessorV2(namespaceId, clientBeat, client);
  27. // 异步 立即执行
  28. HealthCheckReactor.scheduleNow(beatProcessor);
  29. client.setLastUpdatedTime();
  30. return NamingResponseCode.OK;
  31. }
  • ClientManager里面获取到当前客户端信息 IpPortBasedClient
  • 第一个if(第7行) 会判断当前客户端是否存在,不存在则响应 RESOURCE_NOT_FOUND(**The requested resource is not found.**),并且看当前客户端内的**ConcurrentHashMap<Service, InstancePublishInfo> publishers** 是否存在当前的服务,如果不存在,则直接调用注册实例的方法
  • 第二个if(第15行)从ServiceManager查看当前服务是否存在,不存在则直接抛出异常
  • 根据入参,创建了一个任务**ClientBeatProcessorV2**,并且使用 ScheduledExecutorService 提交一个异步任务
  • 总的来说,这块的代码是拿到当前客户端的信息,并且走了一个异步任务,接下来看看这个异步任务做了些什么?

ClientBeatProcessorV2 心跳的核心逻辑

com.alibaba.nacos.naming.healthcheck.heartbeat#ClientBeatProcessorV2

  1. public class ClientBeatProcessorV2 implements BeatProcessor {
  2. private final String namespace;
  3. private final RsInfo rsInfo;
  4. private final IpPortBasedClient client;
  5. public ClientBeatProcessorV2(String namespace, RsInfo rsInfo, IpPortBasedClient ipPortBasedClient) {
  6. this.namespace = namespace;
  7. this.rsInfo = rsInfo;
  8. this.client = ipPortBasedClient;
  9. }
  10. @Override
  11. public void run() {
  12. if (Loggers.EVT_LOG.isDebugEnabled()) {
  13. Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
  14. }
  15. String ip = rsInfo.getIp();
  16. int port = rsInfo.getPort();
  17. String serviceName = NamingUtils.getServiceName(rsInfo.getServiceName());
  18. String groupName = NamingUtils.getGroupName(rsInfo.getServiceName());
  19. Service service = Service.newService(namespace, groupName, serviceName, rsInfo.isEphemeral());
  20. HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) client.getInstancePublishInfo(service);
  21. // 根据之前保存的IP和端口 校验一下还对不对 按道理来说不存在不对的情况吧
  22. if (instance.getIp().equals(ip) && instance.getPort() == port) {
  23. if (Loggers.EVT_LOG.isDebugEnabled()) {
  24. Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo);
  25. }
  26. // 设置实例最后一次的心跳检测的时间,并且将状态设置成为健康
  27. instance.setLastHeartBeatTime(System.currentTimeMillis());
  28. if (!instance.isHealthy()) {
  29. instance.setHealthy(true);
  30. Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
  31. rsInfo.getServiceName(), ip, port, rsInfo.getCluster(), UtilsAndCommons.LOCALHOST_SITE);
  32. NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service));
  33. NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(client));
  34. }
  35. }
  36. }
  37. }
  • 这是上面通过线程池调用的异步线程
  • 先获取到当前实例信息 HealthCheckInstancePublishInfo ,校验当前的IP和端口是否正确,不过一般情况下应该都是true的吧,什么情况会出现客户端上报的IP和端口端的信息不对等呢….
  • 接着设置了最后一次心跳时间,同时如果当前实例如果不是healthy状态,则设置成true,最后通过NotifyCenter发布了两个事件,服务修改事件和客户端修改事件


UnhealthyInstanceChecker 检测健康状态

  1. public class UnhealthyInstanceChecker implements InstanceBeatChecker {
  2. @Override
  3. public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {
  4. if (instance.isHealthy() && isUnhealthy(service, instance)) {
  5. changeHealthyStatus(client, service, instance);
  6. }
  7. }
  8. }
  • 不健康的实例检测,毕竟ClientBeatProcessorV2是设置成健康,可没什么能够设置成不健康的地方… 这个类其实具体应该放在Client的地方来描述,但是毕竟是讲心跳,所以得闭环一下…
  • 其实,每一个客户端(**IpPortBasedClient**)都会开一个周期性的5s的任务,来检测最后一次心跳时间是否正常(**isUnhealthy**这个方法,**System.**_**currentTimeMillis****() **_**- instance.getLastHeartBeatTime**_**() **_**> beatTimeout**),如果不健康(则会根据最后一次心跳时间来判断,**ClientBeatProcessorV2**这个异步任务修改的时间),则会将实例信息修改为不健康
  • 当然这个线程如何开始工作的,会在后面的Client相关的文章里描述

到此心跳的整个闭环大致是这样,当然,这是我的理解…