NameServer路由发现
NameServer路由发现指的是客户端(Producer或Consumer)在发送或拉取消息时需要获取最新的Topic信息,从而根据Topic信息向对应的Broker发送或拉取消息,而NameServer路由发现就是为客户端(Producer或Consumer)提供的Topic路由信息接口。
RocketMQ的Topic路由信息发生改变时,是不会及时推送给客户端(Producer或Consumer),而是客户端(Producer或Consumer)在启动时会开启一个定时任务线程,每隔一段时间就会拉取并更新Topic路由信息。
1):代码执行
通过之前我们就聊过,RocketMQ的数据交互模式利用的NIO(异步非阻塞),使用的是Netty框架。所以我们根据调用链直接定位到org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest,DefaultRequestProcessor正是实现了NettyRequestProcessor接口。
再根据RequestCode.GET_ROUTEINFO_BY_TOPIC看到正是调用的DefaultRequestProcessor#getRouteInfoByTopic方法。
DefaultRequestProcessor#getRouteInfoByTopic
//创建默认response(默认为异常response)
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
//根据请求获取路由请求头
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
//根据Topic获取Topic路由数据
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
//判定当前Topic是否存在
if (topicRouteData != null) {
//判断是否开启顺序消费
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
//设置顺序消费配置
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
}
//将Topic路由信息转码,并返回
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
//若Topic不存在,返回ResponseCode.TOPIC_NOT_EXIST
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
根据Topic获取Topic路由信息
RouteInfoManager#pickupTopicRouteData
//创建空Topic路由数据
TopicRouteData topicRouteData = new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
Set<String> brokerNameSet = new HashSet<String>();
List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
topicRouteData.setBrokerDatas(brokerDataList);
HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
topicRouteData.setFilterServerTable(filterServerMap);
try {
try {
//加读锁
this.lock.readLock().lockInterruptibly();
//根据Topic获取Topic队列路由信息集合
List<QueueData> queueDataList = this.topicQueueTable.get(topic);
//若Topic队列路由信息集合不为空
if (queueDataList != null) {
topicRouteData.setQueueDatas(queueDataList);
foundQueueData = true;
//迭代Topic队列路由信息集合
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
//增加BrokerName
brokerNameSet.add(qd.getBrokerName());
}
//迭代BrokerNameSet
for (String brokerName : brokerNameSet) {
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
.getBrokerAddrs().clone());
//生成Broker信息
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
//若需要类过滤,加入类过滤数据
List<String> filterServerList = this.filterServerTable.get(brokerAddr);
filterServerMap.put(brokerAddr, filterServerList);
}
}
}
}
} finally {
//释放读锁
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("pickupTopicRouteData Exception", e);
}
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
//若BrokerData与QueueData都有的情况下,返回Topic路由信息
if (foundBrokerData && foundQueueData) {
return topicRouteData;
}
return null;
2):逻辑总结
- 通过Netty的Socket通道接收请求数据,请求指令为:RequestCode.GET_ROUTEINFO_BY_TOPIC。
- 生成默认Response(异常response)。
- 根据Topic获取Topic路由数据,使用读锁(共享锁)生成TopicRouteData。TopicRouteData中包含Broker基本信息,Topic队列路由信息集合,类过滤集合信息。
- 判断是否开启顺序消费,若开启加入顺序消费配置。
- 将TopicRouteData转成字节数组放入Response中返回。