本人看的源码版本是nacos2.0.3
不同版本的逻辑大致一致
客户端相关代码
还是从客户端注册的地方开始(注册的流程可以参考上一篇)
com.alibaba.nacos.client.naming.remote.http.NamingHttpClientProxy#registerService
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
// ...
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
//...
}
省略了服务注册的逻辑,直接看看心跳逻辑
在这里判断了当前的服务是否是 短暂的客户端(默认true),然后构建了一些心跳信息
public class BeatInfo {
private int port;
private String ip;
private double weight;
private String serviceName;
private String cluster;
private Map<String, String> metadata;
private volatile boolean scheduled;
private volatile long period;
private volatile boolean stopped;
// ignore...
}
如当前实例的端口,IP,权重,服务名,等,需要注意的是period,以及stopped,这两个值的作用是跟心跳的延时以及控制心跳线程有关,后续代码会细化
**beatReactor.addBeatInfo**_**(**_**groupedServiceName, beatInfo**_**)**_
向心跳反应堆添加心跳信息(源码稍后分析),后续的心跳的线程相关操作交由 BeatReactorBeatReactor
下面贴的代码有点多,不过会对每个方法都详细描述(下列代码都是BeatReactor内部的代码)
首先看看其构造器
public class BeatReactor implements Closeable {
// ignore..
private final ScheduledExecutorService executorService;
public BeatReactor(NamingHttpClientProxy serverProxy, Properties properties) {
this.serverProxy = serverProxy;
// 获取合适的线程数
int threadCount = initClientBeatThreadCount(properties);
this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
// ThreadFactory 设置的这个线程为守护线程,意味着该线程不出意外是会一直伴随着实例发送心跳
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
// 设置为守护线程,在没有用户线程可服务时会自动离开
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.beat.sender");
return thread;
}
});
}
// ignore..
}
在其构造器中,初始化了一个线程池ScheduledExecutorService(可延迟后运行,或定期执行),其核心线程数为:
ThreadUtils._getSuitableThreadCount(_1_) _> 1 ? ThreadUtils._getSuitableThreadCount(_1_) _/ 2 : 1
通过核数,计算出合适的线程数之和,如果>1,则取一半,不然则返回1- 同时,传入的线程工厂设置了其名称,以及其为守护线程,个人理解:线程池通过工厂创建出来的心跳线程均被标记为了守护线程,则说明当所有用户线程(或服务下线)退出的时候,即这个线程会被销毁(这里顺带提一下和后面代码相关的,心跳的时候是每5s都会通过线程池创建新的线程进行心跳包的发送,即发送完心跳就return了,这里标记为守护线程原因可能考虑到这5s内,如果服务down掉,能避免掉这次的心跳包的发送了,【个人理解】)
- 所以说,心跳反应堆BeatReactor在创建初期,主要是初始化了其心跳的线程池的大小,以及设置了其线程工厂
addBeatInfo
public class BeatReactor implements Closeable {
// ignore ..
private final ScheduledExecutorService executorService;
public final Map<String, BeatInfo> dom2Beat = new ConcurrentHashMap<String, BeatInfo>();
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
// 心跳 key serviceName#192.168.0.6#8080
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat;
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
// 添加细心跳任务
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
}
- 这是在客户端注册的时候调用的方法,即生成心跳任务的地方
- key serviceName#192.168.0.6#8080 这里的key由 服务名#IP#端口 拼凑而成
- 向 dom2Beat 这个map添加了当前实例的信息 , 不过为什么用map来保存还不清楚,毕竟key对于当前客户端/服务而言 是唯一的
- 向executorService发送了一个心跳任务,以及任务延迟时间(volatile修饰),即实例注册的时候就准备延迟Period的毫秒的时间后开始执行心跳任务的线程
- MetricsMonitor这个玩意没用过,应该是性能指标监控之类的,有空再看
BeatTask !!!!!!
心跳任务
public class BeatReactor implements Closeable {
// ignore..
/**
* 发送心跳信息的线程
*/
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
if (beatInfo.isStopped()) {
return;
}
// 心跳的间隔时间
long nextTime = beatInfo.getPeriod();
try {
// 发送心跳
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get(CLIENT_BEAT_INTERVAL_FIELD).asLong();
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
// 未找到请求的资源 则会重试注册
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
serverProxy.registerService(beatInfo.getServiceName(),
NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ex) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
} catch (Exception unknownEx) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}",
JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);
} finally {
// 再次提交一个检测心跳的线程任务 ! 时间为 nextTime = period
// 因为每次都是使用的一个新的线程 通过beatInfo的volatile(保证线程可见)修饰的stopped
// 来判断心跳是否需要终止
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
}
- 直接看BeatTask这个线程的run方法,首先会看到对BeatInfo的stopped字段的判断,检验当前心跳是否需要执行(该字段volatile修饰,
**private volatile boolean stopped;**
因为这涉及到多个线程共享一个变量了) - 发送心跳
**serverProxy.sendBeat**_**(**_**beatInfo, BeatReactor.this.lightBeatEnabled**_**)**_**;**
里面的代码待会会接着看,先梳理这个心跳的线程内容 - nextTime即心跳间隔时间,第一次是客户端指定或者默认的,之后会获取从服务端响应的心跳间隔时间
**CLIENT_BEAT_INTERVAL_FIELD**
,服务端默认的其实也是5s,待会讲服务端的时候会贴 - 当客户端返回的code码为
**code == NamingResponseCode.**_**RESOURCE_NOT_FOUND**_
(The requested resource is not found.)的时候,会重新服务注册的逻辑,**registerService**
- 在finally代码块里,向线程池提交一个新的心跳任务!!!
- 简单的总结一下就是,客户端的心跳是通过线程池来发送心跳的,每次心跳发送完都会向线程池提交一个新的心跳任务
NamingHttpClientProxy
sendBeat
这里就是上面的BeatReactor的BeatTask调用发送心跳**sendBeat**
的方法
/**
* Send beat.
*/
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
}
Map<String, String> params = new HashMap<String, String>(16);
Map<String, String> bodyMap = new HashMap<String, String>(2);
if (!lightBeatEnabled) { // lightBeatEnabled = false
bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
}
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
params.put(IP_PARAM, beatInfo.getIp());
params.put(PORT_PARAM, String.valueOf(beatInfo.getPort()));
// /nacos/v1/ns/instance/beat -- PUT请求
String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
return JacksonUtils.toObj(result);
}
- 包装了一下当前实例的一些信息 beatInfo
- PUT 请求接口
**/nacos/v1/ns/instance/beat**
服务端相关代码
beat
这里是服务端处理心跳请求的controller
com.alibaba.nacos.naming.controllers.InstanceController#beat
/**
* 客户端上报心跳信息的地方
*/
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
ObjectNode result = JacksonUtils.createEmptyJsonNode();
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
RsInfo clientBeat = null;
if (StringUtils.isNotBlank(beat)) {
clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
}
String clusterName = WebUtils
.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
if (clientBeat != null) {
if (StringUtils.isNotBlank(clientBeat.getCluster())) {
clusterName = clientBeat.getCluster();
} else {
// fix #2533
clientBeat.setCluster(clusterName);
}
ip = clientBeat.getIp();
port = clientBeat.getPort();
}
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}, namespaceId: {}", clientBeat,
serviceName, namespaceId);
BeatInfoInstanceBuilder builder = BeatInfoInstanceBuilder.newBuilder();
// 对请求做一些操作 用于处理 1.x 客户端的某些指定请求
builder.setRequest(request);
// 处理心跳
int resultCode = getInstanceOperator()
.handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat, builder);
result.put(CommonParams.CODE, resultCode);
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL,
getInstanceOperator().getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName));
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
}
- 首先做的是根据传入的心跳信息,获取一些相关的参数
- 不过主要处理服务端接收到心跳逻辑的地方是第41行的handleBeat
- 同时返回了 CLIENT_BEAT_INTERVAL 即心跳间隔时间,回传给客户端的,对应了BeatTask里的获取nextTime
handleBeat
public int handleBeat(String namespaceId, String serviceName, String ip, int port, String cluster,
RsInfo clientBeat, BeatInfoInstanceBuilder builder) throws NacosException {
Service service = getService(namespaceId, serviceName, true);
String clientId = IpPortBasedClient.getClientId(ip + InternetAddressUtil.IP_PORT_SPLITER + port, true);
//拿到客户端的信息
IpPortBasedClient client = (IpPortBasedClient) clientManager.getClient(clientId);
if (null == client || !client.getAllPublishedService().contains(service)) {
if (null == clientBeat) {
return NamingResponseCode.RESOURCE_NOT_FOUND;
}
Instance instance = builder.setBeatInfo(clientBeat).setServiceName(serviceName).build();
registerInstance(namespaceId, serviceName, instance);
client = (IpPortBasedClient) clientManager.getClient(clientId);
}
if (!ServiceManager.getInstance().containSingleton(service)) {
throw new NacosException(NacosException.SERVER_ERROR,
"service not found: " + serviceName + "@" + namespaceId);
}
if (null == clientBeat) {
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(cluster);
clientBeat.setServiceName(serviceName);
}
ClientBeatProcessorV2 beatProcessor = new ClientBeatProcessorV2(namespaceId, clientBeat, client);
// 异步 立即执行
HealthCheckReactor.scheduleNow(beatProcessor);
client.setLastUpdatedTime();
return NamingResponseCode.OK;
}
- 从ClientManager里面获取到当前客户端信息 IpPortBasedClient
- 第一个if(第7行) 会判断当前客户端是否存在,不存在则响应 RESOURCE_NOT_FOUND(**The requested resource is not found.**),并且看当前客户端内的
**ConcurrentHashMap<Service, InstancePublishInfo> publishers**
是否存在当前的服务,如果不存在,则直接调用注册实例的方法 - 第二个if(第15行)从ServiceManager查看当前服务是否存在,不存在则直接抛出异常
- 根据入参,创建了一个任务
**ClientBeatProcessorV2**
,并且使用 ScheduledExecutorService 提交一个异步任务 - 总的来说,这块的代码是拿到当前客户端的信息,并且走了一个异步任务,接下来看看这个异步任务做了些什么?
ClientBeatProcessorV2 心跳的核心逻辑
com.alibaba.nacos.naming.healthcheck.heartbeat#ClientBeatProcessorV2
public class ClientBeatProcessorV2 implements BeatProcessor {
private final String namespace;
private final RsInfo rsInfo;
private final IpPortBasedClient client;
public ClientBeatProcessorV2(String namespace, RsInfo rsInfo, IpPortBasedClient ipPortBasedClient) {
this.namespace = namespace;
this.rsInfo = rsInfo;
this.client = ipPortBasedClient;
}
@Override
public void run() {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
}
String ip = rsInfo.getIp();
int port = rsInfo.getPort();
String serviceName = NamingUtils.getServiceName(rsInfo.getServiceName());
String groupName = NamingUtils.getGroupName(rsInfo.getServiceName());
Service service = Service.newService(namespace, groupName, serviceName, rsInfo.isEphemeral());
HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) client.getInstancePublishInfo(service);
// 根据之前保存的IP和端口 校验一下还对不对 按道理来说不存在不对的情况吧
if (instance.getIp().equals(ip) && instance.getPort() == port) {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo);
}
// 设置实例最后一次的心跳检测的时间,并且将状态设置成为健康
instance.setLastHeartBeatTime(System.currentTimeMillis());
if (!instance.isHealthy()) {
instance.setHealthy(true);
Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
rsInfo.getServiceName(), ip, port, rsInfo.getCluster(), UtilsAndCommons.LOCALHOST_SITE);
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service));
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(client));
}
}
}
}
- 这是上面通过线程池调用的异步线程
- 先获取到当前实例信息 HealthCheckInstancePublishInfo ,校验当前的IP和端口是否正确,不过一般情况下应该都是true的吧,什么情况会出现客户端上报的IP和端口端的信息不对等呢….
- 接着设置了最后一次心跳时间,同时如果当前实例如果不是healthy状态,则设置成true,最后通过NotifyCenter发布了两个事件,服务修改事件和客户端修改事件
UnhealthyInstanceChecker 检测健康状态
public class UnhealthyInstanceChecker implements InstanceBeatChecker {
@Override
public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {
if (instance.isHealthy() && isUnhealthy(service, instance)) {
changeHealthyStatus(client, service, instance);
}
}
}
- 不健康的实例检测,毕竟ClientBeatProcessorV2是设置成健康,可没什么能够设置成不健康的地方… 这个类其实具体应该放在Client的地方来描述,但是毕竟是讲心跳,所以得闭环一下…
- 其实,每一个客户端(
**IpPortBasedClient**
)都会开一个周期性的5s的任务,来检测最后一次心跳时间是否正常(**isUnhealthy**
这个方法,**System.**_**currentTimeMillis****() **_**- instance.getLastHeartBeatTime**_**() **_**> beatTimeout**
),如果不健康(则会根据最后一次心跳时间来判断,**ClientBeatProcessorV2**
这个异步任务修改的时间),则会将实例信息修改为不健康 - 当然这个线程如何开始工作的,会在后面的Client相关的文章里描述
到此心跳的整个闭环大致是这样,当然,这是我的理解…