NameServer主动清除

1):代码执行

首先我们可以定位到RouteInfoManager#unregisterBroker。查看其调用链得知是通过Netty的NIO异步回调,通过RequestCode.UNREGISTER_BROKER请求指令调用的unregisterBroker方法。
image.png

  1. try {
  2. try {
  3. //加上写锁
  4. this.lock.writeLock().lockInterruptibly();
  5. //删除Broker的状态信息
  6. BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);
  7. log.info("unregisterBroker, remove from brokerLiveTable {}, {}",
  8. brokerLiveInfo != null ? "OK" : "Failed",
  9. brokerAddr
  10. );
  11. //类过滤集合删除Broker的地址
  12. this.filterServerTable.remove(brokerAddr);
  13. boolean removeBrokerName = false;
  14. BrokerData brokerData = this.brokerAddrTable.get(brokerName);
  15. if (null != brokerData) {
  16. //删除BrokerData中的broker
  17. String addr = brokerData.getBrokerAddrs().remove(brokerId);
  18. log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",
  19. addr != null ? "OK" : "Failed",
  20. brokerAddr
  21. );
  22. if (brokerData.getBrokerAddrs().isEmpty()) {
  23. //删除Broker地址集合中的Broker名称
  24. this.brokerAddrTable.remove(brokerName);
  25. log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",
  26. brokerName
  27. );
  28. removeBrokerName = true;
  29. }
  30. }
  31. //如果该Broker基本信息集合中的集合已经被删除干净的话,清除clusterName
  32. if (removeBrokerName) {
  33. //删除cluster地址集合中的cluster名称
  34. Set<String> nameSet = this.clusterAddrTable.get(clusterName);
  35. if (nameSet != null) {
  36. boolean removed = nameSet.remove(brokerName);
  37. log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",
  38. removed ? "OK" : "Failed",
  39. brokerName);
  40. if (nameSet.isEmpty()) {
  41. this.clusterAddrTable.remove(clusterName);
  42. log.info("unregisterBroker, remove cluster from clusterAddrTable {}",
  43. clusterName
  44. );
  45. }
  46. }
  47. //删除Topic信息
  48. this.removeTopicByBrokerName(brokerName);
  49. }
  50. } finally {
  51. //解锁
  52. this.lock.writeLock().unlock();
  53. }
  54. } catch (Exception e) {
  55. log.error("unregisterBroker Exception", e);
  56. }

2):逻辑总结

  1. 通过Netty的Socket通道发送注销指令RequestCode.UNREGISTER_BROKER。
  2. 根据指令调用注销Broker的方法。
  3. clusterAddrTable(Broker集群信息,存储集群中所有的brokerName),brokerAddrTable(Broker基本信息集合),topicQueueTable(Topic队列路由信息集合,消息发送时根据路由表进行负载均衡),brokerLiveTable(Broker状态信息集合),filterServerTable(类模式消息过滤(4.4版本后废弃))中有关于该Broker的数据信息进行清除。
  4. 注:在数据清除时加上Lock锁,避免并发造成数据异常。

    NameServer超时剔除

    1):代码执行

    从之前的NameServer启动流程得知,NameServer在启动时初始化开启了一个定时任务线程每隔10s扫描一次不再活跃的Broker。
    org.apache.rocketmq.namesrv.NamesrvController#initialize

    1. //开启定时任务线程,5s后执行,每隔10s执行一次
    2. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    3. @Override
    4. public void run() {
    5. //扫描不再活跃的broker
    6. NamesrvController.this.routeInfoManager.scanNotActiveBroker();
    7. }
    8. }, 5, 10, TimeUnit.SECONDS);

    RouteInfoManager#scanNotActiveBroker

    1. //获取Broker状态信息集合迭代器
    2. Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
    3. while (it.hasNext()) {
    4. Entry<String, BrokerLiveInfo> next = it.next();
    5. //获取最后更新时间戳
    6. long last = next.getValue().getLastUpdateTimestamp();
    7. //进行超时时间比对,看是否超过120s(BROKER_CHANNEL_EXPIRED_TIME=120s)
    8. if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
    9. //关闭NameServer与该Broker的Socket通道
    10. RemotingUtil.closeChannel(next.getValue().getChannel());
    11. //删除Broker状态信息
    12. it.remove();
    13. log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
    14. //执行通道销毁事件
    15. //onChannelDestroy就不展示了,逻辑特别简单。
    16. //简单描述一下就是将各个集合中对应的Broker信息进行清除,并且清除的时候加上Lock锁,避免造成数据异常。
    17. this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
    18. }
    19. }

    2):逻辑总结

  5. 开启定时任务线程,每隔10s扫描一次。

  6. 删除判定规则为:(LastUpdateTimestamp+BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()。
  7. 判定Broker的状态信息中最新时间戳已经超时120s后,关闭NameServer与Broker的Socket连接通道。
  8. clusterAddrTable(Broker集群信息,存储集群中所有的brokerName),brokerAddrTable(Broker基本信息集合),topicQueueTable(Topic队列路由信息集合,消息发送时根据路由表进行负载均衡),brokerLiveTable(Broker状态信息集合),filterServerTable(类模式消息过滤(4.4版本后废弃))中有关于该Broker的数据信息进行清除。
  9. 注:在数据清除时加上Lock锁,避免并发造成数据异常。