客户端拉取Topic路由机制
消息生产者(Producer)在消息发送之前,需要根据本地缓存的Topic路由信息选定Topic对Broker服务器进行消息发送。而为了避免缓存Topic路由信息的过期与失效,所以Producer在启动时开启了一个定时任务线程,每隔30s向NameServer拉取最新的Topic路由信息缓存到本地。
并且在发送消息的多处逻辑中也有拉取Topic路由机制。其原理为当在本地缓存中找不到对应的路由信息,则会从NameServer拉取最新的Topic路由信息缓存到本地。
1):代码执行
查找Topic路由信息的核心代码:、、、DefaultMQProducerImpl#tryToFindTopicPublishInfo
//从主题路由信息集合中获取数据TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);//若为空,则根据Topic从NameServer中更新Topic路由信息数据if (null == topicPublishInfo || !topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic);}//若存在,直接返回if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo;} else {//若不存在路由信息或状态异常,从NameServer中更新数据this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}
MQClientInstance#updateTopicRouteInfoFromNameServer
try {//加重入锁if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {TopicRouteData topicRouteData;if (isDefault && defaultMQProducer != null) {//更新当前Topic更新Topic路由数据topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),clientConfig.getMqClientApiTimeout());if (topicRouteData != null) {for (QueueData data : topicRouteData.getQueueDatas()) {//设置队列数int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}}} else {//更新当前Topic更新Topic路由数据topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());}if (topicRouteData != null) {//获取旧数据TopicRouteData old = this.topicRouteTable.get(topic);//检查数据是否需要改变boolean changed = topicRouteDataIsChange(old, topicRouteData);if (!changed) {changed = this.isNeedUpdateTopicRouteInfo(topic);} else {log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);}//若需要改变if (changed) {//更新Broker地址集合中的broker名称和地址TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();for (BrokerData bd : topicRouteData.getBrokerDatas()) {this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// 更新producer缓存的topic路由信息{TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);publishInfo.setHaveTopicRouterInfo(true);Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if (impl != null) {impl.updateTopicPublishInfo(topic, publishInfo);}}}//更新consumer缓存的topic路由信息{Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();if (impl != null) {impl.updateTopicSubscribeInfo(topic, subscribeInfo);}}}log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);//将Topic路由信息更新到topic路由集合中this.topicRouteTable.put(topic, cloneTopicRouteData);return true;}} else {log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);}} catch (MQClientException e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("updateTopicRouteInfoFromNameServer Exception", e);}} catch (RemotingException e) {log.error("updateTopicRouteInfoFromNameServer Exception", e);throw new IllegalStateException(e);} finally {//解锁this.lockNamesrv.unlock();}} else {log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId);}} catch (InterruptedException e) {log.warn("updateTopicRouteInfoFromNameServer Exception", e);}return false;
2):逻辑总结
- 根据topic从本地获取topic路由信息
- 若该topic路由信息为空,或状态异常,则从NameServer中更新最新的topic路由信息
- 加锁
- 根据topic,使用Netty的Socket通道发送RequestCode.GET_ROUTEINFO_BY_TOPIC指令获取最新的topic路由信息
- 更新读写队列数,Broker地址信息,producer集合或consumer集合中的topic路由缓存数据,topic路由集合
- 解锁
-
客户端定时任务
topic路由信息不但需要客户端主要拉取,而且每隔30s需要更新一次来保证最新路由信息数据
MQClientInstance#startScheduledTask
//默认每隔30s更新一次topic路由信息数据this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.updateTopicRouteInfoFromNameServer();} catch (Exception e) {log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);}}}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
