微服务注册中心自我保护机制思想

spring cloud eureka,微服务注册中心里,有一个东西,他是说有一个机制,自我保护机制。
register-server部署的机器出现了故障,导致网络有了问题,导致大量的服务实例都没有办法发送心跳信息过来,导致register-server上的ServiceAliveMonitor线程在检查的时候,会发现大面积的出现各个服务实例超过90秒没发送心跳。
现在有一个解决办法就是把这些超过90秒没有发心跳的实例全部摘除,这样靠谱吗?不靠谱!
就需要一个自我保护机制,这个自我保护就是说什么情况下可以让register-server认为是自己的网络问题导致别人的心跳发不过来呢?有一个比例,spring cloud eureka提供的一个比例是25%,超过25%的服务实例的心跳都没法过来。
案例中,如果ServiceAliveMonitor发现超过25%的服务实例的心跳,都没及时更新,就可以认为是可能register-server自己部署的机器的网络出现了故障,导致别的服务的心跳发不过来。
这个时候,开始自动进入自我保护机制,他不再摘除任何的服务实例,避免说在自己网络故障的情况下,一下子就摘除50%,60%的服务实例,导致注册表的数据出现严重的数据缺失。
如果后面再次运行的时候,发现已经有超过85%的服务实例其实恢复了发送心跳了。此时ServiceAliveMonitor可以自动退出自我保护的状态,可以继续去检查是不是某个服务实例他的心跳没有发送过来,没有及时更新,超过90秒还没更新,此时认为这个服务实例就宕机了,就要摘除。
那么应该如何去计算这比例呢?

  • 可以收集一下每分钟的心跳总次数。比如说,现在有10个服务实例,按理说每分钟应该有20次心跳,但是某一分钟你发现收到的心跳次数只有8次,此时就发现8 < 20 * 0.85,也就是说,发现有超过25%的服务实例的心跳没有正常的发送。如果是这种情况,则认为是自己的网络出了问题,进入自我保护机制,不再摘除服务实例了。
  • 如果一分钟的心跳次数,达到了18,18 > 20 * 0.85,此时就认为是服务注册中心的网络肯定是正常了,此时就退出自我保护机制,就可以正常的检查服务实例的心跳是否在90秒内更新过,如果没更新过,就可以自动摘除这个故障实例

    synchronized实现服务的心跳计数器

    添加一个计数器组件

    1. /**
    2. * 心跳测量计数器
    3. */
    4. public class HeartbeatMessuredRate {
    5. private static HeartbeatMessuredRate instance = new HeartbeatMessuredRate();
    6. /**
    7. *最近一分钟心跳次数
    8. */
    9. private long latestMinuteHeartbeatRate = 0L;
    10. /**
    11. * 最近一分钟时间戳
    12. */
    13. private long latestMinuteTimestamp = System.currentTimeMillis();
    14. public static HeartbeatMessuredRate getInstance() {
    15. return instance;
    16. }
    17. public synchronized void increment() {
    18. long currentTime = System.currentTimeMillis();
    19. //每隔60秒把这个心跳次数清零,重新计数
    20. if (currentTime - latestMinuteTimestamp > 60 * 1000) {
    21. latestMinuteHeartbeatRate = 0l;
    22. this.latestMinuteTimestamp = System.currentTimeMillis();
    23. }
    24. latestMinuteHeartbeatRate++;
    25. }
    26. /**
    27. * 获取最近一分钟的心跳次数
    28. */
    29. public long get() {
    30. return latestMinuteHeartbeatRate;
    31. }
    32. }

    服务下线

    在通信组件HttpSender中添加下面方法,这个方法实际上是要通过rpc去调用服务端对应的下线方法

    1. /**
    2. * 服务下线
    3. * @param serviceName
    4. * @param serviceInstanceId
    5. */
    6. public void cancel(String serviceName, String serviceInstanceId) {
    7. System.out.println("服务实例下线【"+serviceName+","+serviceInstanceId+"】");
    8. }

    自我保护机制阈值修改

    新建自我保护机制类

    1. public class SelfProtectionPolicy {
    2. private static SelfProtectionPolicy instance = new SelfProtectionPolicy();
    3. /**
    4. * 期望的心跳数
    5. */
    6. private long expectedHeartbeatRate = 0L;
    7. /**
    8. * 期望的心跳次数阈值,
    9. */
    10. private long expectedHeartbeatThreshold = 0L;
    11. public static SelfProtectionPolicy getInstance() {
    12. return instance;
    13. }
    14. }

    上线服务和下线服务的方法中增加下面代码

    1. //更新自我保护机制阈值,要加锁,因为这个SelfProtectionPolicy对象的值会被不断的修改
    2. synchronized (SelfProtectionPolicy.class) {
    3. SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();
    4. //eruake实现方式,上线一个服务就要+2下线就是-2,每个实例每分钟是两次心跳
    5. selfProtectionPolicy.setExpectedHeartbeatRate(
    6. selfProtectionPolicy.getExpectedHeartbeatRate() + 2);
    7. selfProtectionPolicy.setExpectedHeartbeatThreshold(
    8. (long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));
    9. }

    基于Synchronized开启自我保护机制

    在代码监控线程中实现

    1. public void run() {
    2. Map<String, Map<String, ServiceInstance>> registryRegistry = null;
    3. // System.out.println(Thread.currentThread().getName() + "线程的线程组是:" +
    4. // Thread.currentThread().getThreadGroup());
    5. while (true) {
    6. try {
    7. //判断是否要开启自我保护机制,开启自我保护机制意味着就不能随便再去下线服务了。
    8. SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();
    9. if (selfProtectionPolicy.isEnable()) {
    10. Thread.sleep(CHECK_ALIVE_INTERVAL);
    11. continue;
    12. }
    13. //获取注册表
    14. registryRegistry = this.registry.getRegistry();
    15. for (String serviceName : registryRegistry.keySet()) {
    16. Map<String, ServiceInstance> serviceInstanceMap =
    17. registryRegistry.get(serviceName);
    18. for (ServiceInstance serviceInstance : serviceInstanceMap.values()) {
    19. //服务实例距离上一次心跳已经超过90秒了
    20. //就要把这个实例摘除
    21. if (!serviceInstance.isAlive()) {
    22. registry.remove(serviceInstance.getServiceName(),
    23. serviceInstance.getServiceInstanceId());
    24. synchronized (SelfProtectionPolicy.class) {
    25. //eruake实现方式,下线一个服务就要-2,每个实例每分钟是两次心跳
    26. selfProtectionPolicy.setExpectedHeartbeatRate(
    27. selfProtectionPolicy.getExpectedHeartbeatRate() - 2);
    28. selfProtectionPolicy.setExpectedHeartbeatThreshold(
    29. (long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));
    30. }
    31. }
    32. }
    33. }
    34. Thread.sleep(CHECK_ALIVE_INTERVAL);
    35. } catch (InterruptedException e) {
    36. e.printStackTrace();
    37. }
    38. }
    39. }