客户端拉取Topic路由机制

消息生产者(Producer)在消息发送之前,需要根据本地缓存的Topic路由信息选定Topic对Broker服务器进行消息发送。而为了避免缓存Topic路由信息的过期与失效,所以Producer在启动时开启了一个定时任务线程,每隔30s向NameServer拉取最新的Topic路由信息缓存到本地。
并且在发送消息的多处逻辑中也有拉取Topic路由机制。其原理为当在本地缓存中找不到对应的路由信息,则会从NameServer拉取最新的Topic路由信息缓存到本地。

1):代码执行

查找Topic路由信息的核心代码:、、、DefaultMQProducerImpl#tryToFindTopicPublishInfo

  1. //从主题路由信息集合中获取数据
  2. TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
  3. //若为空,则根据Topic从NameServer中更新Topic路由信息数据
  4. if (null == topicPublishInfo || !topicPublishInfo.ok()) {
  5. this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
  6. this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
  7. topicPublishInfo = this.topicPublishInfoTable.get(topic);
  8. }
  9. //若存在,直接返回
  10. if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
  11. return topicPublishInfo;
  12. } else {
  13. //若不存在路由信息或状态异常,从NameServer中更新数据
  14. this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
  15. topicPublishInfo = this.topicPublishInfoTable.get(topic);
  16. return topicPublishInfo;
  17. }

MQClientInstance#updateTopicRouteInfoFromNameServer

  1. try {
  2. //加重入锁
  3. if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
  4. try {
  5. TopicRouteData topicRouteData;
  6. if (isDefault && defaultMQProducer != null) {
  7. //更新当前Topic更新Topic路由数据
  8. topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
  9. clientConfig.getMqClientApiTimeout());
  10. if (topicRouteData != null) {
  11. for (QueueData data : topicRouteData.getQueueDatas()) {
  12. //设置队列数
  13. int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
  14. data.setReadQueueNums(queueNums);
  15. data.setWriteQueueNums(queueNums);
  16. }
  17. }
  18. } else {
  19. //更新当前Topic更新Topic路由数据
  20. topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
  21. }
  22. if (topicRouteData != null) {
  23. //获取旧数据
  24. TopicRouteData old = this.topicRouteTable.get(topic);
  25. //检查数据是否需要改变
  26. boolean changed = topicRouteDataIsChange(old, topicRouteData);
  27. if (!changed) {
  28. changed = this.isNeedUpdateTopicRouteInfo(topic);
  29. } else {
  30. log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
  31. }
  32. //若需要改变
  33. if (changed) {
  34. //更新Broker地址集合中的broker名称和地址
  35. TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
  36. for (BrokerData bd : topicRouteData.getBrokerDatas()) {
  37. this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
  38. }
  39. // 更新producer缓存的topic路由信息
  40. {
  41. TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
  42. publishInfo.setHaveTopicRouterInfo(true);
  43. Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
  44. while (it.hasNext()) {
  45. Entry<String, MQProducerInner> entry = it.next();
  46. MQProducerInner impl = entry.getValue();
  47. if (impl != null) {
  48. impl.updateTopicPublishInfo(topic, publishInfo);
  49. }
  50. }
  51. }
  52. //更新consumer缓存的topic路由信息
  53. {
  54. Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
  55. Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
  56. while (it.hasNext()) {
  57. Entry<String, MQConsumerInner> entry = it.next();
  58. MQConsumerInner impl = entry.getValue();
  59. if (impl != null) {
  60. impl.updateTopicSubscribeInfo(topic, subscribeInfo);
  61. }
  62. }
  63. }
  64. log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
  65. //将Topic路由信息更新到topic路由集合中
  66. this.topicRouteTable.put(topic, cloneTopicRouteData);
  67. return true;
  68. }
  69. } else {
  70. log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
  71. }
  72. } catch (MQClientException e) {
  73. if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
  74. log.warn("updateTopicRouteInfoFromNameServer Exception", e);
  75. }
  76. } catch (RemotingException e) {
  77. log.error("updateTopicRouteInfoFromNameServer Exception", e);
  78. throw new IllegalStateException(e);
  79. } finally {
  80. //解锁
  81. this.lockNamesrv.unlock();
  82. }
  83. } else {
  84. log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId);
  85. }
  86. } catch (InterruptedException e) {
  87. log.warn("updateTopicRouteInfoFromNameServer Exception", e);
  88. }
  89. return false;

2):逻辑总结

  1. 根据topic从本地获取topic路由信息
  2. 若该topic路由信息为空,或状态异常,则从NameServer中更新最新的topic路由信息
    1. 加锁
    2. 根据topic,使用Netty的Socket通道发送RequestCode.GET_ROUTEINFO_BY_TOPIC指令获取最新的topic路由信息
    3. 更新读写队列数,Broker地址信息,producer集合或consumer集合中的topic路由缓存数据,topic路由集合
    4. 解锁
  3. 从topic路由集合返回topic新路由信息

    客户端定时任务

    topic路由信息不但需要客户端主要拉取,而且每隔30s需要更新一次来保证最新路由信息数据

    MQClientInstance#startScheduledTask

    1. //默认每隔30s更新一次topic路由信息数据
    2. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    3. @Override
    4. public void run() {
    5. try {
    6. MQClientInstance.this.updateTopicRouteInfoFromNameServer();
    7. } catch (Exception e) {
    8. log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
    9. }
    10. }
    11. }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);