NameServer主动清除
1):代码执行
首先我们可以定位到RouteInfoManager#unregisterBroker。查看其调用链得知是通过Netty的NIO异步回调,通过RequestCode.UNREGISTER_BROKER请求指令调用的unregisterBroker方法。
try {try {//加上写锁this.lock.writeLock().lockInterruptibly();//删除Broker的状态信息BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);log.info("unregisterBroker, remove from brokerLiveTable {}, {}",brokerLiveInfo != null ? "OK" : "Failed",brokerAddr);//类过滤集合删除Broker的地址this.filterServerTable.remove(brokerAddr);boolean removeBrokerName = false;BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null != brokerData) {//删除BrokerData中的brokerString addr = brokerData.getBrokerAddrs().remove(brokerId);log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",addr != null ? "OK" : "Failed",brokerAddr);if (brokerData.getBrokerAddrs().isEmpty()) {//删除Broker地址集合中的Broker名称this.brokerAddrTable.remove(brokerName);log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",brokerName);removeBrokerName = true;}}//如果该Broker基本信息集合中的集合已经被删除干净的话,清除clusterNameif (removeBrokerName) {//删除cluster地址集合中的cluster名称Set<String> nameSet = this.clusterAddrTable.get(clusterName);if (nameSet != null) {boolean removed = nameSet.remove(brokerName);log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",removed ? "OK" : "Failed",brokerName);if (nameSet.isEmpty()) {this.clusterAddrTable.remove(clusterName);log.info("unregisterBroker, remove cluster from clusterAddrTable {}",clusterName);}}//删除Topic信息this.removeTopicByBrokerName(brokerName);}} finally {//解锁this.lock.writeLock().unlock();}} catch (Exception e) {log.error("unregisterBroker Exception", e);}
2):逻辑总结
- 通过Netty的Socket通道发送注销指令RequestCode.UNREGISTER_BROKER。
- 根据指令调用注销Broker的方法。
- 将clusterAddrTable(Broker集群信息,存储集群中所有的brokerName),brokerAddrTable(Broker基本信息集合),topicQueueTable(Topic队列路由信息集合,消息发送时根据路由表进行负载均衡),brokerLiveTable(Broker状态信息集合),filterServerTable(类模式消息过滤(4.4版本后废弃))中有关于该Broker的数据信息进行清除。
-
NameServer超时剔除
1):代码执行
从之前的NameServer启动流程得知,NameServer在启动时初始化开启了一个定时任务线程每隔10s扫描一次不再活跃的Broker。
org.apache.rocketmq.namesrv.NamesrvController#initialize//开启定时任务线程,5s后执行,每隔10s执行一次this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {//扫描不再活跃的brokerNamesrvController.this.routeInfoManager.scanNotActiveBroker();}}, 5, 10, TimeUnit.SECONDS);
RouteInfoManager#scanNotActiveBroker
//获取Broker状态信息集合迭代器Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();while (it.hasNext()) {Entry<String, BrokerLiveInfo> next = it.next();//获取最后更新时间戳long last = next.getValue().getLastUpdateTimestamp();//进行超时时间比对,看是否超过120s(BROKER_CHANNEL_EXPIRED_TIME=120s)if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {//关闭NameServer与该Broker的Socket通道RemotingUtil.closeChannel(next.getValue().getChannel());//删除Broker状态信息it.remove();log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);//执行通道销毁事件//onChannelDestroy就不展示了,逻辑特别简单。//简单描述一下就是将各个集合中对应的Broker信息进行清除,并且清除的时候加上Lock锁,避免造成数据异常。this.onChannelDestroy(next.getKey(), next.getValue().getChannel());}}
2):逻辑总结
开启定时任务线程,每隔10s扫描一次。
- 删除判定规则为:(LastUpdateTimestamp+BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()。
- 判定Broker的状态信息中最新时间戳已经超时120s后,关闭NameServer与Broker的Socket连接通道。
- 将clusterAddrTable(Broker集群信息,存储集群中所有的brokerName),brokerAddrTable(Broker基本信息集合),topicQueueTable(Topic队列路由信息集合,消息发送时根据路由表进行负载均衡),brokerLiveTable(Broker状态信息集合),filterServerTable(类模式消息过滤(4.4版本后废弃))中有关于该Broker的数据信息进行清除。
- 注:在数据清除时加上Lock锁,避免并发造成数据异常。
