1. 所谓的故障延迟机制指的是当客户端在发送消息时,将消息发送给指定Broker时发生异常情况。在这种情况下,为了保证重试机制的成功,会暂时将该Broker设置为短暂不可用。等再次发送给该Broker时成功后,会将该状态改为可用。<br />为什么客户端在发现Broker故障时不进行立刻清除的原因是客户端更新Topic路由信息是由定时任务来执行。至于为什么会存在Broker故障的情况,而NameServer依旧将这样的信息返回给客户端是因为NameServerBroker保持心跳连接也是有延迟的。所以才会有这种两种情况所在。<br />**sendLatencyFaultEnable=false,**默认为不启用Broker故障延迟机制<br />**sendLatencyFaultEnable=true,**启用故障延迟机制

MQ失败策略:MQFaultStrategy

  1. public class MQFaultStrategy {
  2. private final static InternalLogger log = ClientLogger.getLog();
  3. //故障延迟接口实现
  4. private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
  5. //是否启用故障延迟机制,默认false
  6. private boolean sendLatencyFaultEnable = false;
  7. //延迟最大时间
  8. private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
  9. //不可用持续时间
  10. private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
  11. ../部分代码省略

故障延迟接口:LatencyFaultTolerance

  1. public interface LatencyFaultTolerance<T> {
  2. //更新失败条目(broker名称,消息发送故障的延迟时间,不可用持续时间)
  3. void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);
  4. //判断Broker是否可用
  5. boolean isAvailable(final T name);
  6. //移除失败条目,意味着该Broker恢复使用
  7. void remove(final T name);
  8. //尝试从规避的Broker中选择一个可用的Broker,没有返回空
  9. T pickOneAtLeast();
  10. }

失败条目:FaultItem

  1. class FaultItem implements Comparable<FaultItem> {
  2. //Broker名称
  3. private final String name;
  4. //本次消息的延迟时间
  5. private volatile long currentLatency;
  6. //故障规避的开始时间
  7. private volatile long startTimestamp;
  8. ../部分代码省略

故障延迟机制

1):故障延迟机制代码执行

首先定位到MQFaultStrategy#selectOneMessageQueue,它的入口是DefaultMQProducerImpl#sendDefaultImpl。
TopicPublishInfo:Topic路由信息
LastBrokerName:上一次选择的BrokerName

  1. //判断是否开启故障延迟机制
  2. if (this.sendLatencyFaultEnable) {
  3. try {
  4. //index % 队列数量 = pos,根据pos获取队列
  5. //简单描述就是循环遍历当前队列,判断队列是否可用,可用就返回
  6. //不可用就从失败条目中随机挑选一个
  7. int index = tpInfo.getSendWhichQueue().incrementAndGet();
  8. for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
  9. int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
  10. //pos会小于0?没看懂
  11. if (pos < 0)
  12. pos = 0;
  13. MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
  14. if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
  15. return mq;
  16. }
  17. //尝试从失败item中选出一个可用的Broker
  18. final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
  19. //获取写队列数
  20. int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
  21. //若写队列数大于0,说明该Broker是存在的
  22. if (writeQueueNums > 0) {
  23. //随机选一个MQ队列,然后设置返回(这块稍有没看懂)
  24. final MessageQueue mq = tpInfo.selectOneMessageQueue();
  25. if (notBestBroker != null) {
  26. //设置该消息队列属性
  27. mq.setBrokerName(notBestBroker);
  28. mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
  29. }
  30. return mq;
  31. } else {
  32. //从失败条目中删除该Broker,表示该Broker可用或不存在
  33. latencyFaultTolerance.remove(notBestBroker);
  34. }
  35. } catch (Exception e) {
  36. log.error("Error occurred when selecting message queue", e);
  37. }
  38. //选择一个消息队列
  39. return tpInfo.selectOneMessageQueue();
  40. }
  41. return tpInfo.selectOneMessageQueue(lastBrokerName);

//尝试在规避的Broker中随机选择一个可用的broker,没有找到即返回
LatencyFaultToleranceImpl#pickOneAtLeast

  1. //ConcurrentHashMap转LinkedList
  2. final Enumeration<FaultItem> elements = this.faultItemTable.elements();
  3. List<FaultItem> tmpList = new LinkedList<FaultItem>();
  4. while (elements.hasMoreElements()) {
  5. final FaultItem faultItem = elements.nextElement();
  6. tmpList.add(faultItem);
  7. }
  8. //若该链表的为空
  9. if (!tmpList.isEmpty()) {
  10. //说实话这两句没太看懂,为什么要打乱顺序再排序
  11. //打乱顺序
  12. Collections.shuffle(tmpList);
  13. //排序
  14. Collections.sort(tmpList);
  15. //取失败条目中间值
  16. final int half = tmpList.size() / 2;
  17. if (half <= 0) {
  18. //如果小于等于0,取第一个
  19. return tmpList.get(0).getName();
  20. } else {
  21. //否则使用index % 一半
  22. final int i = this.whichItemWorst.incrementAndGet() % half;
  23. //返回该broker
  24. return tmpList.get(i).getName();
  25. }
  26. }
  27. return null;

MQFaultStrategy#updateFaultItem

  1. //若已开启故障延迟机制
  2. if (this.sendLatencyFaultEnable) {
  3. //开启隔离?30s:当前延迟时间(这个时间是消息发送的耗费时间)
  4. long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
  5. //更新失败条目
  6. this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
  7. }

MQFaultStrategy#computeNotAvailableDuration

  1. //latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
  2. //notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
  3. //从后往前找一个比currentLantency小的索引,再从notAvailableDuration选出对应的时间
  4. //在这个时间之内,Broker不可用。
  5. for (int i = latencyMax.length - 1; i >= 0; i--) {
  6. if (currentLatency >= latencyMax[i])
  7. return this.notAvailableDuration[i];
  8. }
  9. return 0;

2):故障延迟机制逻辑总结

  1. 选择队列规则:
    1. 根据index的新增遍历Topic的MQ队列,判断可用即返回该MQ队列
    2. 若都不可用,则从失败条目中随机挑一个。
      1. 若该Broker的写队列大于0,则从Topic的MQ队列列表中选一个MQ,把当前Broker的信息覆盖上去,返回该MQ。(这块逻辑没太捋明白,后面再补充)
      2. 若不大于0,说明该队列是好用的,从失败条目中移除(要不就是该队列不存在)。后面再选一个MQ队列返回。
  2. 更新失败条目规则:
    1. 判断当前是否需要隔离返回当前延迟时间(true:30s,false:发送消息前后消耗时间)
    2. 从后往前找一个比currentLantency小的索引,再从notAvailableDuration选出对应的时间,在这个时间之内,Broker不可用。
    3. 延迟时间越高,不可用持续时间越高。当延迟时间=30000时,不可用持续时间为600s,也就是10分钟。
  3. 故障延迟机制逻辑概括总结:
    1. 当开启故障延迟机制时,根据Topic路由信息进行筛选MQ队列。
    2. 当Topic路由信息MQ队列集合中的MQ队列都不符合要求时,则会从失败条目集群中随机挑选broker作为MQ来使。
    3. 在将消息发送给MQ队列中时,计算耗时。耗时的时间越长,那么Broker的故障延迟时间也越长。也就是说当该Broker的MQ队列负载过高导致的延迟时间过长或者该Broker的MQ队列故障时,RocketMQ的故障延迟机制自动的会将该MQ队列放入失败条目,避免发送消息时再对该MQ队列进行消息发送,避免压力或者重试。