微服务注册中心自我保护机制思想
spring cloud eureka,微服务注册中心里,有一个东西,他是说有一个机制,自我保护机制。
register-server部署的机器出现了故障,导致网络有了问题,导致大量的服务实例都没有办法发送心跳信息过来,导致register-server上的ServiceAliveMonitor线程在检查的时候,会发现大面积的出现各个服务实例超过90秒没发送心跳。
现在有一个解决办法就是把这些超过90秒没有发心跳的实例全部摘除,这样靠谱吗?不靠谱!
就需要一个自我保护机制,这个自我保护就是说什么情况下可以让register-server认为是自己的网络问题导致别人的心跳发不过来呢?有一个比例,spring cloud eureka提供的一个比例是25%,超过25%的服务实例的心跳都没法过来。
案例中,如果ServiceAliveMonitor发现超过25%的服务实例的心跳,都没及时更新,就可以认为是可能register-server自己部署的机器的网络出现了故障,导致别的服务的心跳发不过来。
这个时候,开始自动进入自我保护机制,他不再摘除任何的服务实例,避免说在自己网络故障的情况下,一下子就摘除50%,60%的服务实例,导致注册表的数据出现严重的数据缺失。
如果后面再次运行的时候,发现已经有超过85%的服务实例其实恢复了发送心跳了。此时ServiceAliveMonitor可以自动退出自我保护的状态,可以继续去检查是不是某个服务实例他的心跳没有发送过来,没有及时更新,超过90秒还没更新,此时认为这个服务实例就宕机了,就要摘除。
那么应该如何去计算这比例呢?
- 可以收集一下每分钟的心跳总次数。比如说,现在有10个服务实例,按理说每分钟应该有20次心跳,但是某一分钟你发现收到的心跳次数只有8次,此时就发现8 < 20 * 0.85,也就是说,发现有超过25%的服务实例的心跳没有正常的发送。如果是这种情况,则认为是自己的网络出了问题,进入自我保护机制,不再摘除服务实例了。
如果一分钟的心跳次数,达到了18,18 > 20 * 0.85,此时就认为是服务注册中心的网络肯定是正常了,此时就退出自我保护机制,就可以正常的检查服务实例的心跳是否在90秒内更新过,如果没更新过,就可以自动摘除这个故障实例
synchronized实现服务的心跳计数器
添加一个计数器组件
/**
* 心跳测量计数器
*/
public class HeartbeatMessuredRate {
private static HeartbeatMessuredRate instance = new HeartbeatMessuredRate();
/**
*最近一分钟心跳次数
*/
private long latestMinuteHeartbeatRate = 0L;
/**
* 最近一分钟时间戳
*/
private long latestMinuteTimestamp = System.currentTimeMillis();
public static HeartbeatMessuredRate getInstance() {
return instance;
}
public synchronized void increment() {
long currentTime = System.currentTimeMillis();
//每隔60秒把这个心跳次数清零,重新计数
if (currentTime - latestMinuteTimestamp > 60 * 1000) {
latestMinuteHeartbeatRate = 0l;
this.latestMinuteTimestamp = System.currentTimeMillis();
}
latestMinuteHeartbeatRate++;
}
/**
* 获取最近一分钟的心跳次数
*/
public long get() {
return latestMinuteHeartbeatRate;
}
}
服务下线
在通信组件HttpSender中添加下面方法,这个方法实际上是要通过rpc去调用服务端对应的下线方法
/**
* 服务下线
* @param serviceName
* @param serviceInstanceId
*/
public void cancel(String serviceName, String serviceInstanceId) {
System.out.println("服务实例下线【"+serviceName+","+serviceInstanceId+"】");
}
自我保护机制阈值修改
新建自我保护机制类
public class SelfProtectionPolicy {
private static SelfProtectionPolicy instance = new SelfProtectionPolicy();
/**
* 期望的心跳数
*/
private long expectedHeartbeatRate = 0L;
/**
* 期望的心跳次数阈值,
*/
private long expectedHeartbeatThreshold = 0L;
public static SelfProtectionPolicy getInstance() {
return instance;
}
}
上线服务和下线服务的方法中增加下面代码
//更新自我保护机制阈值,要加锁,因为这个SelfProtectionPolicy对象的值会被不断的修改
synchronized (SelfProtectionPolicy.class) {
SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();
//eruake实现方式,上线一个服务就要+2下线就是-2,每个实例每分钟是两次心跳
selfProtectionPolicy.setExpectedHeartbeatRate(
selfProtectionPolicy.getExpectedHeartbeatRate() + 2);
selfProtectionPolicy.setExpectedHeartbeatThreshold(
(long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));
}
基于Synchronized开启自我保护机制
在代码监控线程中实现
public void run() {
Map<String, Map<String, ServiceInstance>> registryRegistry = null;
// System.out.println(Thread.currentThread().getName() + "线程的线程组是:" +
// Thread.currentThread().getThreadGroup());
while (true) {
try {
//判断是否要开启自我保护机制,开启自我保护机制意味着就不能随便再去下线服务了。
SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();
if (selfProtectionPolicy.isEnable()) {
Thread.sleep(CHECK_ALIVE_INTERVAL);
continue;
}
//获取注册表
registryRegistry = this.registry.getRegistry();
for (String serviceName : registryRegistry.keySet()) {
Map<String, ServiceInstance> serviceInstanceMap =
registryRegistry.get(serviceName);
for (ServiceInstance serviceInstance : serviceInstanceMap.values()) {
//服务实例距离上一次心跳已经超过90秒了
//就要把这个实例摘除
if (!serviceInstance.isAlive()) {
registry.remove(serviceInstance.getServiceName(),
serviceInstance.getServiceInstanceId());
synchronized (SelfProtectionPolicy.class) {
//eruake实现方式,下线一个服务就要-2,每个实例每分钟是两次心跳
selfProtectionPolicy.setExpectedHeartbeatRate(
selfProtectionPolicy.getExpectedHeartbeatRate() - 2);
selfProtectionPolicy.setExpectedHeartbeatThreshold(
(long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));
}
}
}
}
Thread.sleep(CHECK_ALIVE_INTERVAL);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}