NameServer路由发现
NameServer路由发现指的是客户端(Producer或Consumer)在发送或拉取消息时需要获取最新的Topic信息,从而根据Topic信息向对应的Broker发送或拉取消息,而NameServer路由发现就是为客户端(Producer或Consumer)提供的Topic路由信息接口。
RocketMQ的Topic路由信息发生改变时,是不会及时推送给客户端(Producer或Consumer),而是客户端(Producer或Consumer)在启动时会开启一个定时任务线程,每隔一段时间就会拉取并更新Topic路由信息。
1):代码执行
通过之前我们就聊过,RocketMQ的数据交互模式利用的NIO(异步非阻塞),使用的是Netty框架。所以我们根据调用链直接定位到org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest,DefaultRequestProcessor正是实现了NettyRequestProcessor接口。
再根据RequestCode.GET_ROUTEINFO_BY_TOPIC看到正是调用的DefaultRequestProcessor#getRouteInfoByTopic方法。
DefaultRequestProcessor#getRouteInfoByTopic
//创建默认response(默认为异常response)final RemotingCommand response = RemotingCommand.createResponseCommand(null);//根据请求获取路由请求头final GetRouteInfoRequestHeader requestHeader =(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);//根据Topic获取Topic路由数据TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());//判定当前Topic是否存在if (topicRouteData != null) {//判断是否开启顺序消费if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {//设置顺序消费配置String orderTopicConf =this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,requestHeader.getTopic());topicRouteData.setOrderTopicConf(orderTopicConf);}//将Topic路由信息转码,并返回byte[] content = topicRouteData.encode();response.setBody(content);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}//若Topic不存在,返回ResponseCode.TOPIC_NOT_EXISTresponse.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));return response;
根据Topic获取Topic路由信息
RouteInfoManager#pickupTopicRouteData
//创建空Topic路由数据TopicRouteData topicRouteData = new TopicRouteData();boolean foundQueueData = false;boolean foundBrokerData = false;Set<String> brokerNameSet = new HashSet<String>();List<BrokerData> brokerDataList = new LinkedList<BrokerData>();topicRouteData.setBrokerDatas(brokerDataList);HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();topicRouteData.setFilterServerTable(filterServerMap);try {try {//加读锁this.lock.readLock().lockInterruptibly();//根据Topic获取Topic队列路由信息集合List<QueueData> queueDataList = this.topicQueueTable.get(topic);//若Topic队列路由信息集合不为空if (queueDataList != null) {topicRouteData.setQueueDatas(queueDataList);foundQueueData = true;//迭代Topic队列路由信息集合Iterator<QueueData> it = queueDataList.iterator();while (it.hasNext()) {QueueData qd = it.next();//增加BrokerNamebrokerNameSet.add(qd.getBrokerName());}//迭代BrokerNameSetfor (String brokerName : brokerNameSet) {BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null != brokerData) {BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone());//生成Broker信息brokerDataList.add(brokerDataClone);foundBrokerData = true;for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {//若需要类过滤,加入类过滤数据List<String> filterServerList = this.filterServerTable.get(brokerAddr);filterServerMap.put(brokerAddr, filterServerList);}}}}} finally {//释放读锁this.lock.readLock().unlock();}} catch (Exception e) {log.error("pickupTopicRouteData Exception", e);}log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);//若BrokerData与QueueData都有的情况下,返回Topic路由信息if (foundBrokerData && foundQueueData) {return topicRouteData;}return null;
2):逻辑总结
- 通过Netty的Socket通道接收请求数据,请求指令为:RequestCode.GET_ROUTEINFO_BY_TOPIC。
- 生成默认Response(异常response)。
- 根据Topic获取Topic路由数据,使用读锁(共享锁)生成TopicRouteData。TopicRouteData中包含Broker基本信息,Topic队列路由信息集合,类过滤集合信息。
- 判断是否开启顺序消费,若开启加入顺序消费配置。
- 将TopicRouteData转成字节数组放入Response中返回。
