NameServer主动清除
1):代码执行
首先我们可以定位到RouteInfoManager#unregisterBroker。查看其调用链得知是通过Netty的NIO异步回调,通过RequestCode.UNREGISTER_BROKER请求指令调用的unregisterBroker方法。
try {
try {
//加上写锁
this.lock.writeLock().lockInterruptibly();
//删除Broker的状态信息
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);
log.info("unregisterBroker, remove from brokerLiveTable {}, {}",
brokerLiveInfo != null ? "OK" : "Failed",
brokerAddr
);
//类过滤集合删除Broker的地址
this.filterServerTable.remove(brokerAddr);
boolean removeBrokerName = false;
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
//删除BrokerData中的broker
String addr = brokerData.getBrokerAddrs().remove(brokerId);
log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",
addr != null ? "OK" : "Failed",
brokerAddr
);
if (brokerData.getBrokerAddrs().isEmpty()) {
//删除Broker地址集合中的Broker名称
this.brokerAddrTable.remove(brokerName);
log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",
brokerName
);
removeBrokerName = true;
}
}
//如果该Broker基本信息集合中的集合已经被删除干净的话,清除clusterName
if (removeBrokerName) {
//删除cluster地址集合中的cluster名称
Set<String> nameSet = this.clusterAddrTable.get(clusterName);
if (nameSet != null) {
boolean removed = nameSet.remove(brokerName);
log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",
removed ? "OK" : "Failed",
brokerName);
if (nameSet.isEmpty()) {
this.clusterAddrTable.remove(clusterName);
log.info("unregisterBroker, remove cluster from clusterAddrTable {}",
clusterName
);
}
}
//删除Topic信息
this.removeTopicByBrokerName(brokerName);
}
} finally {
//解锁
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("unregisterBroker Exception", e);
}
2):逻辑总结
- 通过Netty的Socket通道发送注销指令RequestCode.UNREGISTER_BROKER。
- 根据指令调用注销Broker的方法。
- 将clusterAddrTable(Broker集群信息,存储集群中所有的brokerName),brokerAddrTable(Broker基本信息集合),topicQueueTable(Topic队列路由信息集合,消息发送时根据路由表进行负载均衡),brokerLiveTable(Broker状态信息集合),filterServerTable(类模式消息过滤(4.4版本后废弃))中有关于该Broker的数据信息进行清除。
-
NameServer超时剔除
1):代码执行
从之前的NameServer启动流程得知,NameServer在启动时初始化开启了一个定时任务线程每隔10s扫描一次不再活跃的Broker。
org.apache.rocketmq.namesrv.NamesrvController#initialize//开启定时任务线程,5s后执行,每隔10s执行一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//扫描不再活跃的broker
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
RouteInfoManager#scanNotActiveBroker
//获取Broker状态信息集合迭代器
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
//获取最后更新时间戳
long last = next.getValue().getLastUpdateTimestamp();
//进行超时时间比对,看是否超过120s(BROKER_CHANNEL_EXPIRED_TIME=120s)
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
//关闭NameServer与该Broker的Socket通道
RemotingUtil.closeChannel(next.getValue().getChannel());
//删除Broker状态信息
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
//执行通道销毁事件
//onChannelDestroy就不展示了,逻辑特别简单。
//简单描述一下就是将各个集合中对应的Broker信息进行清除,并且清除的时候加上Lock锁,避免造成数据异常。
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
2):逻辑总结
开启定时任务线程,每隔10s扫描一次。
- 删除判定规则为:(LastUpdateTimestamp+BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()。
- 判定Broker的状态信息中最新时间戳已经超时120s后,关闭NameServer与Broker的Socket连接通道。
- 将clusterAddrTable(Broker集群信息,存储集群中所有的brokerName),brokerAddrTable(Broker基本信息集合),topicQueueTable(Topic队列路由信息集合,消息发送时根据路由表进行负载均衡),brokerLiveTable(Broker状态信息集合),filterServerTable(类模式消息过滤(4.4版本后废弃))中有关于该Broker的数据信息进行清除。
- 注:在数据清除时加上Lock锁,避免并发造成数据异常。