客户端拉取Topic路由机制
消息生产者(Producer)在消息发送之前,需要根据本地缓存的Topic路由信息选定Topic对Broker服务器进行消息发送。而为了避免缓存Topic路由信息的过期与失效,所以Producer在启动时开启了一个定时任务线程,每隔30s向NameServer拉取最新的Topic路由信息缓存到本地。
并且在发送消息的多处逻辑中也有拉取Topic路由机制。其原理为当在本地缓存中找不到对应的路由信息,则会从NameServer拉取最新的Topic路由信息缓存到本地。
1):代码执行
查找Topic路由信息的核心代码:、、、DefaultMQProducerImpl#tryToFindTopicPublishInfo
//从主题路由信息集合中获取数据
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
//若为空,则根据Topic从NameServer中更新Topic路由信息数据
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
//若存在,直接返回
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
//若不存在路由信息或状态异常,从NameServer中更新数据
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
MQClientInstance#updateTopicRouteInfoFromNameServer
try {
//加重入锁
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
//更新当前Topic更新Topic路由数据
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
clientConfig.getMqClientApiTimeout());
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
//设置队列数
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
//更新当前Topic更新Topic路由数据
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
}
if (topicRouteData != null) {
//获取旧数据
TopicRouteData old = this.topicRouteTable.get(topic);
//检查数据是否需要改变
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
//若需要改变
if (changed) {
//更新Broker地址集合中的broker名称和地址
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// 更新producer缓存的topic路由信息
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
//更新consumer缓存的topic路由信息
{
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
//将Topic路由信息更新到topic路由集合中
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
}
} catch (MQClientException e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
} catch (RemotingException e) {
log.error("updateTopicRouteInfoFromNameServer Exception", e);
throw new IllegalStateException(e);
} finally {
//解锁
this.lockNamesrv.unlock();
}
} else {
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId);
}
} catch (InterruptedException e) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
return false;
2):逻辑总结
- 根据topic从本地获取topic路由信息
- 若该topic路由信息为空,或状态异常,则从NameServer中更新最新的topic路由信息
- 加锁
- 根据topic,使用Netty的Socket通道发送RequestCode.GET_ROUTEINFO_BY_TOPIC指令获取最新的topic路由信息
- 更新读写队列数,Broker地址信息,producer集合或consumer集合中的topic路由缓存数据,topic路由集合
- 解锁
-
客户端定时任务
topic路由信息不但需要客户端主要拉取,而且每隔30s需要更新一次来保证最新路由信息数据
MQClientInstance#startScheduledTask
//默认每隔30s更新一次topic路由信息数据
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);