微服务注册中心自我保护机制思想
spring cloud eureka,微服务注册中心里,有一个东西,他是说有一个机制,自我保护机制。
register-server部署的机器出现了故障,导致网络有了问题,导致大量的服务实例都没有办法发送心跳信息过来,导致register-server上的ServiceAliveMonitor线程在检查的时候,会发现大面积的出现各个服务实例超过90秒没发送心跳。
现在有一个解决办法就是把这些超过90秒没有发心跳的实例全部摘除,这样靠谱吗?不靠谱!
就需要一个自我保护机制,这个自我保护就是说什么情况下可以让register-server认为是自己的网络问题导致别人的心跳发不过来呢?有一个比例,spring cloud eureka提供的一个比例是25%,超过25%的服务实例的心跳都没法过来。
案例中,如果ServiceAliveMonitor发现超过25%的服务实例的心跳,都没及时更新,就可以认为是可能register-server自己部署的机器的网络出现了故障,导致别的服务的心跳发不过来。
这个时候,开始自动进入自我保护机制,他不再摘除任何的服务实例,避免说在自己网络故障的情况下,一下子就摘除50%,60%的服务实例,导致注册表的数据出现严重的数据缺失。
如果后面再次运行的时候,发现已经有超过85%的服务实例其实恢复了发送心跳了。此时ServiceAliveMonitor可以自动退出自我保护的状态,可以继续去检查是不是某个服务实例他的心跳没有发送过来,没有及时更新,超过90秒还没更新,此时认为这个服务实例就宕机了,就要摘除。
那么应该如何去计算这比例呢?
- 可以收集一下每分钟的心跳总次数。比如说,现在有10个服务实例,按理说每分钟应该有20次心跳,但是某一分钟你发现收到的心跳次数只有8次,此时就发现8 < 20 * 0.85,也就是说,发现有超过25%的服务实例的心跳没有正常的发送。如果是这种情况,则认为是自己的网络出了问题,进入自我保护机制,不再摘除服务实例了。
如果一分钟的心跳次数,达到了18,18 > 20 * 0.85,此时就认为是服务注册中心的网络肯定是正常了,此时就退出自我保护机制,就可以正常的检查服务实例的心跳是否在90秒内更新过,如果没更新过,就可以自动摘除这个故障实例
synchronized实现服务的心跳计数器
添加一个计数器组件
/*** 心跳测量计数器*/public class HeartbeatMessuredRate {private static HeartbeatMessuredRate instance = new HeartbeatMessuredRate();/***最近一分钟心跳次数*/private long latestMinuteHeartbeatRate = 0L;/*** 最近一分钟时间戳*/private long latestMinuteTimestamp = System.currentTimeMillis();public static HeartbeatMessuredRate getInstance() {return instance;}public synchronized void increment() {long currentTime = System.currentTimeMillis();//每隔60秒把这个心跳次数清零,重新计数if (currentTime - latestMinuteTimestamp > 60 * 1000) {latestMinuteHeartbeatRate = 0l;this.latestMinuteTimestamp = System.currentTimeMillis();}latestMinuteHeartbeatRate++;}/*** 获取最近一分钟的心跳次数*/public long get() {return latestMinuteHeartbeatRate;}}
服务下线
在通信组件HttpSender中添加下面方法,这个方法实际上是要通过rpc去调用服务端对应的下线方法
/*** 服务下线* @param serviceName* @param serviceInstanceId*/public void cancel(String serviceName, String serviceInstanceId) {System.out.println("服务实例下线【"+serviceName+","+serviceInstanceId+"】");}
自我保护机制阈值修改
新建自我保护机制类
public class SelfProtectionPolicy {private static SelfProtectionPolicy instance = new SelfProtectionPolicy();/*** 期望的心跳数*/private long expectedHeartbeatRate = 0L;/*** 期望的心跳次数阈值,*/private long expectedHeartbeatThreshold = 0L;public static SelfProtectionPolicy getInstance() {return instance;}}
上线服务和下线服务的方法中增加下面代码
//更新自我保护机制阈值,要加锁,因为这个SelfProtectionPolicy对象的值会被不断的修改synchronized (SelfProtectionPolicy.class) {SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();//eruake实现方式,上线一个服务就要+2下线就是-2,每个实例每分钟是两次心跳selfProtectionPolicy.setExpectedHeartbeatRate(selfProtectionPolicy.getExpectedHeartbeatRate() + 2);selfProtectionPolicy.setExpectedHeartbeatThreshold((long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));}
基于Synchronized开启自我保护机制
在代码监控线程中实现
public void run() {Map<String, Map<String, ServiceInstance>> registryRegistry = null;// System.out.println(Thread.currentThread().getName() + "线程的线程组是:" +// Thread.currentThread().getThreadGroup());while (true) {try {//判断是否要开启自我保护机制,开启自我保护机制意味着就不能随便再去下线服务了。SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();if (selfProtectionPolicy.isEnable()) {Thread.sleep(CHECK_ALIVE_INTERVAL);continue;}//获取注册表registryRegistry = this.registry.getRegistry();for (String serviceName : registryRegistry.keySet()) {Map<String, ServiceInstance> serviceInstanceMap =registryRegistry.get(serviceName);for (ServiceInstance serviceInstance : serviceInstanceMap.values()) {//服务实例距离上一次心跳已经超过90秒了//就要把这个实例摘除if (!serviceInstance.isAlive()) {registry.remove(serviceInstance.getServiceName(),serviceInstance.getServiceInstanceId());synchronized (SelfProtectionPolicy.class) {//eruake实现方式,下线一个服务就要-2,每个实例每分钟是两次心跳selfProtectionPolicy.setExpectedHeartbeatRate(selfProtectionPolicy.getExpectedHeartbeatRate() - 2);selfProtectionPolicy.setExpectedHeartbeatThreshold((long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));}}}}Thread.sleep(CHECK_ALIVE_INTERVAL);} catch (InterruptedException e) {e.printStackTrace();}}}
