NameServer路由发现

NameServer路由发现指的是客户端(Producer或Consumer)在发送或拉取消息时需要获取最新的Topic信息,从而根据Topic信息向对应的Broker发送或拉取消息,而NameServer路由发现就是为客户端(Producer或Consumer)提供的Topic路由信息接口。
RocketMQ的Topic路由信息发生改变时,是不会及时推送给客户端(Producer或Consumer),而是客户端(Producer或Consumer)在启动时会开启一个定时任务线程,每隔一段时间就会拉取并更新Topic路由信息。

1):代码执行

通过之前我们就聊过,RocketMQ的数据交互模式利用的NIO(异步非阻塞),使用的是Netty框架。所以我们根据调用链直接定位到org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequestDefaultRequestProcessor正是实现了NettyRequestProcessor接口。
再根据RequestCode.GET_ROUTEINFO_BY_TOPIC看到正是调用的DefaultRequestProcessor#getRouteInfoByTopic方法。
image.png
DefaultRequestProcessor#getRouteInfoByTopic

  1. //创建默认response(默认为异常response)
  2. final RemotingCommand response = RemotingCommand.createResponseCommand(null);
  3. //根据请求获取路由请求头
  4. final GetRouteInfoRequestHeader requestHeader =
  5. (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
  6. //根据Topic获取Topic路由数据
  7. TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
  8. //判定当前Topic是否存在
  9. if (topicRouteData != null) {
  10. //判断是否开启顺序消费
  11. if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
  12. //设置顺序消费配置
  13. String orderTopicConf =
  14. this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
  15. requestHeader.getTopic());
  16. topicRouteData.setOrderTopicConf(orderTopicConf);
  17. }
  18. //将Topic路由信息转码,并返回
  19. byte[] content = topicRouteData.encode();
  20. response.setBody(content);
  21. response.setCode(ResponseCode.SUCCESS);
  22. response.setRemark(null);
  23. return response;
  24. }
  25. //若Topic不存在,返回ResponseCode.TOPIC_NOT_EXIST
  26. response.setCode(ResponseCode.TOPIC_NOT_EXIST);
  27. response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
  28. + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
  29. return response;

根据Topic获取Topic路由信息
RouteInfoManager#pickupTopicRouteData

  1. //创建空Topic路由数据
  2. TopicRouteData topicRouteData = new TopicRouteData();
  3. boolean foundQueueData = false;
  4. boolean foundBrokerData = false;
  5. Set<String> brokerNameSet = new HashSet<String>();
  6. List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
  7. topicRouteData.setBrokerDatas(brokerDataList);
  8. HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
  9. topicRouteData.setFilterServerTable(filterServerMap);
  10. try {
  11. try {
  12. //加读锁
  13. this.lock.readLock().lockInterruptibly();
  14. //根据Topic获取Topic队列路由信息集合
  15. List<QueueData> queueDataList = this.topicQueueTable.get(topic);
  16. //若Topic队列路由信息集合不为空
  17. if (queueDataList != null) {
  18. topicRouteData.setQueueDatas(queueDataList);
  19. foundQueueData = true;
  20. //迭代Topic队列路由信息集合
  21. Iterator<QueueData> it = queueDataList.iterator();
  22. while (it.hasNext()) {
  23. QueueData qd = it.next();
  24. //增加BrokerName
  25. brokerNameSet.add(qd.getBrokerName());
  26. }
  27. //迭代BrokerNameSet
  28. for (String brokerName : brokerNameSet) {
  29. BrokerData brokerData = this.brokerAddrTable.get(brokerName);
  30. if (null != brokerData) {
  31. BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
  32. .getBrokerAddrs().clone());
  33. //生成Broker信息
  34. brokerDataList.add(brokerDataClone);
  35. foundBrokerData = true;
  36. for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
  37. //若需要类过滤,加入类过滤数据
  38. List<String> filterServerList = this.filterServerTable.get(brokerAddr);
  39. filterServerMap.put(brokerAddr, filterServerList);
  40. }
  41. }
  42. }
  43. }
  44. } finally {
  45. //释放读锁
  46. this.lock.readLock().unlock();
  47. }
  48. } catch (Exception e) {
  49. log.error("pickupTopicRouteData Exception", e);
  50. }
  51. log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
  52. //若BrokerData与QueueData都有的情况下,返回Topic路由信息
  53. if (foundBrokerData && foundQueueData) {
  54. return topicRouteData;
  55. }
  56. return null;


2):逻辑总结

  1. 通过Netty的Socket通道接收请求数据,请求指令为:RequestCode.GET_ROUTEINFO_BY_TOPIC
  2. 生成默认Response(异常response)。
  3. 根据Topic获取Topic路由数据,使用读锁(共享锁)生成TopicRouteData。TopicRouteData中包含Broker基本信息,Topic队列路由信息集合,类过滤集合信息。
  4. 判断是否开启顺序消费,若开启加入顺序消费配置。
  5. 将TopicRouteData转成字节数组放入Response中返回。