NameServer作为RocketMQ的管理“大脑”,主要为Producer和Consumer提供Broker的信息。它能够管理Broker的各个节点,保存元数据信息。可以让Broker进行路由注册,心跳续约,也可以剔除Broker。

路由元信息

Broker在NameServer中存储的信息,实现类为:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager。

  1. //Broker超时时间:默认120s
  2. long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
  3. //读写锁,注册时避免并发
  4. ReadWriteLock lock = new ReentrantReadWriteLock();
  5. //Topic队列路由信息集合,消息发送时根据路由表进行负载均衡
  6. HashMap<String/* topic */, List<QueueData>> topicQueueTable;
  7. //Broker基本信息集合
  8. HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
  9. //Broker集群信息,存储集群中所有的brokerName
  10. HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
  11. //Broker状态信息,NameServer根据brokerLiveTalbe的时间戳来判断是否剔除该Broker
  12. HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
  13. //类模式消息过滤(4.4版本后废弃)
  14. HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

org.apache.rocketmq.common.protocol.route.QueueData类结构

  1. public class QueueData implements Comparable<QueueData> {
  2. //Broker的名称
  3. private String brokerName;
  4. //读取队列数
  5. private int readQueueNums;
  6. //写入队列数
  7. private int writeQueueNums;
  8. //读写权限
  9. private int perm;
  10. //Topic同步标记
  11. private int topicSysFlag;
  12. ../后续省略

org.apache.rocketmq.common.protocol.route.BrokerData类结构

  1. public class BrokerData implements Comparable<BrokerData> {
  2. //集群名称
  3. private String cluster;
  4. //Broker名称
  5. private String brokerName;
  6. //Broker地址列表,brokerId=0表示Master,大于0表示Slave
  7. private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
  8. //根据随机数选取Broker地址
  9. private final Random random = new Random();
  10. ../后续省略

org.apache.rocketmq.namesrv.routeinfo.BrokerLiveInfo类结构

  1. class BrokerLiveInfo {
  2. //最新时间戳
  3. private long lastUpdateTimestamp;
  4. //数据版本号,用于识别主从选取的版本。跟主从同步相关
  5. private DataVersion dataVersion;
  6. //通道
  7. private Channel channel;
  8. //主节点地址
  9. private String haServerAddr;
  10. ../后续省略

路由注册

RocketMQ路由注册是通过Broker与NameServer的心跳功能实现的。跟Nacos的注册中心AP模式稍有不同的是Nacos中有注册,也有心跳续约,它俩逻辑类似(注册时:发现注册过就更新时间戳。心跳续约:发现被剔除就重新注册)。而RocketMQ只有心跳功能(未注册即注册,已注册更新时间戳)。另有不同的是Nacos的client只选择某一个server进行注册,由server异步同步到其他server节点,使用的是http请求。而RocketMQ是同时给集群的所有NameServer发送心跳包,NameServer的集群节点之间不通信,无状态。且使用的是socket 请求,Broker与NameServer保持长连接。
Broker启动时给NameServer集群的所有节点发送心跳,后每隔30s发一次心跳包来进行心跳续约。NameServer收到心跳包后更新BrokerLiveTable的最新时间戳,同时每隔10s扫描一次BrokerLiveTable。超过120s后剔除失效的Broker,同时关闭Socket连接。

Broker发送心跳包

1):代码执行

Broker的启动类为:org.apache.rocketmq.broker.BrokerStartup。通过启动过程我们发现,Broker发送心跳包是通过一个定时线程任务来执行。

  1. if (!messageStoreConfig.isEnableDLegerCommitLog()) {
  2. startProcessorByHa(messageStoreConfig.getBrokerRole());
  3. handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
  4. //开启Broker注册全部
  5. this.registerBrokerAll(true, false, true);
  6. }
  7. //定时线程任务,10s后执行,30s发送一次
  8. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  9. @Override
  10. public void run() {
  11. try {
  12. //开启Broker注册全部
  13. BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
  14. } catch (Throwable e) {
  15. log.error("registerBrokerAll Exception", e);
  16. }
  17. }
  18. }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

BrokerController.this.registerBrokerAll(true, false, true);

  1. public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
  2. ../部分代码省略
  3. //判断是否需要注册
  4. if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
  5. this.getBrokerAddr(),
  6. this.brokerConfig.getBrokerName(),
  7. this.brokerConfig.getBrokerId(),
  8. //注册超时时间:默认6s
  9. this.brokerConfig.getRegisterBrokerTimeoutMills())) {
  10. //执行注册逻辑
  11. doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
  12. }
  13. }

doRegisterBrokerAll(true,false,true);

  1. //调用brokerOuterAPI对所有NameServer集群进行注册,并返回注册的结果集
  2. List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
  3. this.brokerConfig.getBrokerClusterName(),
  4. this.getBrokerAddr(),
  5. this.brokerConfig.getBrokerName(),
  6. this.brokerConfig.getBrokerId(),
  7. this.getHAServerAddr(),
  8. topicConfigWrapper,
  9. this.filterServerManager.buildNewFilterServerList(),
  10. oneway,
  11. this.brokerConfig.getRegisterBrokerTimeoutMills(),
  12. this.brokerConfig.isCompressedRegister());
  13. ../部分代码省略

this.brokerOuterAPI.registerBrokerAll

  1. //创建结果集集合,用来存储注册的结果
  2. final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
  3. //获取需要进行注册的NameServer集合
  4. List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
  5. //判断NameServer地址集合是否数量大于0
  6. if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
  7. //设置注册Broker请求头
  8. final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
  9. //Broker地址
  10. requestHeader.setBrokerAddr(brokerAddr);
  11. //BrokerId
  12. requestHeader.setBrokerId(brokerId);
  13. //Broker名称
  14. requestHeader.setBrokerName(brokerName);
  15. //集群名称
  16. requestHeader.setClusterName(clusterName);
  17. //集群master的地址
  18. requestHeader.setHaServerAddr(haServerAddr);
  19. //暂时不知道
  20. requestHeader.setCompressed(compressed);
  21. //注册Broker体
  22. RegisterBrokerBody requestBody = new RegisterBrokerBody();
  23. //设置目前的Topic主题配置
  24. requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
  25. //设置过滤服务列表
  26. requestBody.setFilterServerList(filterServerList);
  27. //转码
  28. final byte[] body = requestBody.encode(compressed);
  29. //转crc32码
  30. final int bodyCrc32 = UtilAll.crc32(body);
  31. //在请求头中设置crc32体
  32. requestHeader.setBodyCrc32(bodyCrc32);
  33. //创建与NameServer相同数量的同步栅栏,目的是为了阻塞当前线程
  34. //,利用异步的方式注册所有NameServer之后才唤醒
  35. final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
  36. //循环遍历NameServer地址集合进行注册
  37. for (final String namesrvAddr : nameServerAddressList) {
  38. //利用BrokerOuter线程池线程进行注册
  39. brokerOuterExecutor.execute(new Runnable() {
  40. @Override
  41. public void run() {
  42. try {
  43. //注册并获取注册结果
  44. RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
  45. if (result != null) {
  46. //结果集不为空的情况下,放入结果集集合
  47. registerBrokerResultList.add(result);
  48. }
  49. log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
  50. } catch (Exception e) {
  51. log.warn("registerBroker Exception, {}", namesrvAddr, e);
  52. } finally {
  53. //同步栅栏+1,当同步栅栏的count数量等于创建数量时唤醒await线程
  54. countDownLatch.countDown();
  55. }
  56. }
  57. });
  58. }
  59. try {
  60. //同步栅栏进行阻塞,等待唤醒,或者超时自动唤醒。timeoutMills=6s
  61. countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
  62. } catch (InterruptedException e) {
  63. }
  64. }

registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);

  1. //创建请求指令,
  2. RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
  3. request.setBody(body);
  4. //判断是否是单方向发送(单方向表示只发,效率高,但是无法判定是否发送成功)
  5. if (oneway) {
  6. try {
  7. //使用oneway发送注册Broker请求
  8. this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
  9. } catch (RemotingTooMuchRequestException e) {
  10. // Ignore
  11. }
  12. return null;
  13. }
  14. //使用异步发送Broker注册请求
  15. RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
  16. //断言response不为空
  17. assert response != null;
  18. //判定response结果码为成功,返回结果集
  19. switch (response.getCode()) {
  20. case ResponseCode.SUCCESS: {
  21. RegisterBrokerResponseHeader responseHeader =
  22. (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
  23. RegisterBrokerResult result = new RegisterBrokerResult();
  24. result.setMasterAddr(responseHeader.getMasterAddr());
  25. result.setHaServerAddr(responseHeader.getHaServerAddr());
  26. if (response.getBody() != null) {
  27. result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
  28. }
  29. return result;
  30. }
  31. default:
  32. break;
  33. }

this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);

  1. //设置开始执行时间
  2. long beginStartTime = System.currentTimeMillis();
  3. //创建连接通道,若已创建直接获取
  4. final Channel channel = this.getAndCreateChannel(addr);
  5. if (channel != null && channel.isActive()) {
  6. try {
  7. //RocketMQ请求前处理
  8. doBeforeRpcHooks(addr, request);
  9. long costTime = System.currentTimeMillis() - beginStartTime;
  10. if (timeoutMillis < costTime) {
  11. //请求前处理超时即抛异常
  12. throw new RemotingTimeoutException("invokeSync call timeout");
  13. }
  14. //请求处理(这里为注册请求)
  15. RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
  16. //RocketMQ请求后处理
  17. doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
  18. return response;
  19. }
  20. ../部分代码省略
  21. }

this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
//这块要着重讲述一下,返回结果监听是由 org.apache.rocketmq.remoting.netty.NettyRemotingClient.NettyClientHandler类监听到channelRead0,然后判断是response指令后,putResponse和release唤醒CountDownLatch和SemaphoreReleaseOnlyOnce。
具体为什么这么设计还没有搞清楚,至少明白了一点responseTable通过put(opaque,responseFuture)后,再在通过获取NettyClientHandler的读事件时,利用opaque将结果指令放入responseFuture,同时唤醒responseFuture往下执行。

  1. try {
  2. //创建相应结果future
  3. final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
  4. this.responseTable.put(opaque, responseFuture);
  5. final SocketAddress addr = channel.remoteAddress();
  6. //利用通道发送request请求
  7. channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
  8. @Override
  9. public void operationComplete(ChannelFuture f) throws Exception {
  10. //获取响应结果
  11. if (f.isSuccess()) {
  12. //成功
  13. responseFuture.setSendRequestOK(true);
  14. return;
  15. } else {
  16. //失败
  17. responseFuture.setSendRequestOK(false);
  18. }
  19. ../省略部分代码
  20. }
  21. });
  22. //同步栅栏阻塞,直到超时或获取结果集唤醒
  23. RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
  24. ../省略部分代码
  25. //返回结果指令
  26. return responseCommand;
  27. }

2):逻辑总结

  1. 开启Broker注册所有NameServer的定时任务,30s一次
  2. 循环NameServer的地址列表,并利用线程池进行异步线程注册
  3. 注册使用的请求方式为Java NIO,也就是Netty封装的异步非阻塞IO(创建通道连接,并将数据写入缓存提交到Server端)
  4. 通过NettyClient监听读事件,将结果指令放入对应的的结果集中,并唤醒线程。
  5. 在异步线程执行时多次使用CountDownLatch(同步栅栏)来进行线程阻塞,等待结果执行完毕被唤醒。为了避免异常情况,还设置了超时机制。

    NameServer处理心跳包

    1):代码执行

    我们都知道,RocketMQ使用的是Netty封装的异步非阻塞IO来做交互,所以NameServer走的是异步processor。看一下调用链,直接调用的是RouteInfoManager#registerBroker。
    image.png
    定位org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker

    1. try {
    2. //加读写锁的原因是避免并发修改导致数据异常情况
    3. this.lock.writeLock().lockInterruptibly();
    4. //cluster地址集合
    5. Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
    6. if (null == brokerNames) {
    7. brokerNames = new HashSet<String>();
    8. this.clusterAddrTable.put(clusterName, brokerNames);
    9. }
    10. //将Broker的名称放入set(非重复)集合
    11. brokerNames.add(brokerName);
    12. boolean registerFirst = false;
    13. //Broker基本信息集合
    14. BrokerData brokerData = this.brokerAddrTable.get(brokerName);
    15. if (null == brokerData) {
    16. registerFirst = true;
    17. brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
    18. this.brokerAddrTable.put(brokerName, brokerData);
    19. }
    20. //Broker的地址集合
    21. Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
    22. //判断Broker地址相同,但Broker的ID不同时,删除该item。
    23. //其目的是因为可能slave(分节点)变成了master(主节点),所以brokerId发生了改变
    24. //ip:port要唯一
    25. Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
    26. while (it.hasNext()) {
    27. Entry<Long, String> item = it.next();
    28. if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
    29. it.remove();
    30. }
    31. }
    32. //重新put数据
    33. String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
    34. //若oldAddr!=null,说明是一次注册或者是brokerId发生了改变
    35. registerFirst = registerFirst || (null == oldAddr);
    36. //若是master节点
    37. if (null != topicConfigWrapper
    38. && MixAll.MASTER_ID == brokerId) {
    39. //判断Broker主题配置是否改变或是否第一次注册
    40. if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
    41. || registerFirst) {
    42. //重新创建QueueData或更新QueueData(Broker的路由信息)
    43. ConcurrentMap<String, TopicConfig> tcTable =
    44. topicConfigWrapper.getTopicConfigTable();
    45. if (tcTable != null) {
    46. for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
    47. this.createAndUpdateQueueData(brokerName, entry.getValue());
    48. }
    49. }
    50. }
    51. }
    52. //重新加载brokerAddr对应的心跳存活信息
    53. //直接put的好处就是新增跟更新一并搞定,坏处就是创新了一个全新的对象
    54. BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
    55. new BrokerLiveInfo(
    56. System.currentTimeMillis(),
    57. topicConfigWrapper.getDataVersion(),
    58. channel,
    59. haServerAddr));
    60. //如果旧的心跳存活信息为空,说明是第一次创建
    61. if (null == prevBrokerLiveInfo) {
    62. log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
    63. }
    64. //服务过滤集合
    65. if (filterServerList != null) {
    66. if (filterServerList.isEmpty()) {
    67. this.filterServerTable.remove(brokerAddr);
    68. } else {
    69. this.filterServerTable.put(brokerAddr, filterServerList);
    70. }
    71. }
    72. //如果是从节点,将主节点的broker地址赋值给当前result
    73. if (MixAll.MASTER_ID != brokerId) {
    74. String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
    75. if (masterAddr != null) {
    76. BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
    77. if (brokerLiveInfo != null) {
    78. result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
    79. result.setMasterAddr(masterAddr);
    80. }
    81. }
    82. }
    83. } finally {
    84. //解锁
    85. this.lock.writeLock().unlock();
    86. }

    2):逻辑总结

  6. 接收Broker发送来的请求,判断是注册请求后转到RouteInfoManager#registerBroker。

  7. 注册时判断更新了多个存储集合clusterAddrTable(Broker集群信息,存储集群中所有的brokerName),brokerAddrTable(Broker基本信息集合),topicQueueTable(Topic队列路由信息集合,消息发送时根据路由表进行负载均衡),brokerLiveTable(Broker状态信息集合),filterServerTable(类模式消息过滤(4.4版本后废弃))。注:broker的心跳信息直接是覆盖操作,估计是考虑到所有信息都会发生改变的情况,不像Nacos判断在心跳续约时间范围内,直接更新lastUpdateTimestamp
  8. 返回RegisterBrokerResult
  9. 注:在更新数据时加上了Lock锁,避免并发时造成数据异常。