所谓的故障延迟机制指的是当客户端在发送消息时,将消息发送给指定Broker时发生异常情况。在这种情况下,为了保证重试机制的成功,会暂时将该Broker设置为短暂不可用。等再次发送给该Broker时成功后,会将该状态改为可用。<br />为什么客户端在发现Broker故障时不进行立刻清除的原因是客户端更新Topic路由信息是由定时任务来执行。至于为什么会存在Broker故障的情况,而NameServer依旧将这样的信息返回给客户端是因为NameServer与Broker保持心跳连接也是有延迟的。所以才会有这种两种情况所在。<br />**sendLatencyFaultEnable=false,**默认为不启用Broker故障延迟机制<br />**sendLatencyFaultEnable=true,**启用故障延迟机制
MQ失败策略:MQFaultStrategy
public class MQFaultStrategy {
private final static InternalLogger log = ClientLogger.getLog();
//故障延迟接口实现
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
//是否启用故障延迟机制,默认false
private boolean sendLatencyFaultEnable = false;
//延迟最大时间
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
//不可用持续时间
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
../部分代码省略
故障延迟接口:LatencyFaultTolerance
public interface LatencyFaultTolerance<T> {
//更新失败条目(broker名称,消息发送故障的延迟时间,不可用持续时间)
void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);
//判断Broker是否可用
boolean isAvailable(final T name);
//移除失败条目,意味着该Broker恢复使用
void remove(final T name);
//尝试从规避的Broker中选择一个可用的Broker,没有返回空
T pickOneAtLeast();
}
失败条目:FaultItem
class FaultItem implements Comparable<FaultItem> {
//Broker名称
private final String name;
//本次消息的延迟时间
private volatile long currentLatency;
//故障规避的开始时间
private volatile long startTimestamp;
../部分代码省略
故障延迟机制
1):故障延迟机制代码执行
首先定位到MQFaultStrategy#selectOneMessageQueue,它的入口是DefaultMQProducerImpl#sendDefaultImpl。
TopicPublishInfo:Topic路由信息
LastBrokerName:上一次选择的BrokerName
//判断是否开启故障延迟机制
if (this.sendLatencyFaultEnable) {
try {
//index % 队列数量 = pos,根据pos获取队列
//简单描述就是循环遍历当前队列,判断队列是否可用,可用就返回
//不可用就从失败条目中随机挑选一个
int index = tpInfo.getSendWhichQueue().incrementAndGet();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
//pos会小于0?没看懂
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
//尝试从失败item中选出一个可用的Broker
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
//获取写队列数
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
//若写队列数大于0,说明该Broker是存在的
if (writeQueueNums > 0) {
//随机选一个MQ队列,然后设置返回(这块稍有没看懂)
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
//设置该消息队列属性
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
return mq;
} else {
//从失败条目中删除该Broker,表示该Broker可用或不存在
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
//选择一个消息队列
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
//尝试在规避的Broker中随机选择一个可用的broker,没有找到即返回
LatencyFaultToleranceImpl#pickOneAtLeast
//ConcurrentHashMap转LinkedList
final Enumeration<FaultItem> elements = this.faultItemTable.elements();
List<FaultItem> tmpList = new LinkedList<FaultItem>();
while (elements.hasMoreElements()) {
final FaultItem faultItem = elements.nextElement();
tmpList.add(faultItem);
}
//若该链表的为空
if (!tmpList.isEmpty()) {
//说实话这两句没太看懂,为什么要打乱顺序再排序
//打乱顺序
Collections.shuffle(tmpList);
//排序
Collections.sort(tmpList);
//取失败条目中间值
final int half = tmpList.size() / 2;
if (half <= 0) {
//如果小于等于0,取第一个
return tmpList.get(0).getName();
} else {
//否则使用index % 一半
final int i = this.whichItemWorst.incrementAndGet() % half;
//返回该broker
return tmpList.get(i).getName();
}
}
return null;
MQFaultStrategy#updateFaultItem
//若已开启故障延迟机制
if (this.sendLatencyFaultEnable) {
//开启隔离?30s:当前延迟时间(这个时间是消息发送的耗费时间)
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
//更新失败条目
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
MQFaultStrategy#computeNotAvailableDuration
//latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
//notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
//从后往前找一个比currentLantency小的索引,再从notAvailableDuration选出对应的时间
//在这个时间之内,Broker不可用。
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
2):故障延迟机制逻辑总结
- 选择队列规则:
- 根据index的新增遍历Topic的MQ队列,判断可用即返回该MQ队列
- 若都不可用,则从失败条目中随机挑一个。
- 若该Broker的写队列大于0,则从Topic的MQ队列列表中选一个MQ,把当前Broker的信息覆盖上去,返回该MQ。(这块逻辑没太捋明白,后面再补充)
- 若不大于0,说明该队列是好用的,从失败条目中移除(要不就是该队列不存在)。后面再选一个MQ队列返回。
- 更新失败条目规则:
- 判断当前是否需要隔离返回当前延迟时间(true:30s,false:发送消息前后消耗时间)
- 从后往前找一个比currentLantency小的索引,再从notAvailableDuration选出对应的时间,在这个时间之内,Broker不可用。
- 延迟时间越高,不可用持续时间越高。当延迟时间=30000时,不可用持续时间为600s,也就是10分钟。
- 故障延迟机制逻辑概括总结:
- 当开启故障延迟机制时,根据Topic路由信息进行筛选MQ队列。
- 当Topic路由信息MQ队列集合中的MQ队列都不符合要求时,则会从失败条目集群中随机挑选broker作为MQ来使。
- 在将消息发送给MQ队列中时,计算耗时。耗时的时间越长,那么Broker的故障延迟时间也越长。也就是说当该Broker的MQ队列负载过高导致的延迟时间过长或者该Broker的MQ队列故障时,RocketMQ的故障延迟机制自动的会将该MQ队列放入失败条目,避免发送消息时再对该MQ队列进行消息发送,避免压力或者重试。