NameServer作为RocketMQ的管理“大脑”,主要为Producer和Consumer提供Broker的信息。它能够管理Broker的各个节点,保存元数据信息。可以让Broker进行路由注册,心跳续约,也可以剔除Broker。
路由元信息
Broker在NameServer中存储的信息,实现类为:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager。
//Broker超时时间:默认120slong BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;//读写锁,注册时避免并发ReadWriteLock lock = new ReentrantReadWriteLock();//Topic队列路由信息集合,消息发送时根据路由表进行负载均衡HashMap<String/* topic */, List<QueueData>> topicQueueTable;//Broker基本信息集合HashMap<String/* brokerName */, BrokerData> brokerAddrTable;//Broker集群信息,存储集群中所有的brokerNameHashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;//Broker状态信息,NameServer根据brokerLiveTalbe的时间戳来判断是否剔除该BrokerHashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;//类模式消息过滤(4.4版本后废弃)HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
org.apache.rocketmq.common.protocol.route.QueueData类结构
public class QueueData implements Comparable<QueueData> {//Broker的名称private String brokerName;//读取队列数private int readQueueNums;//写入队列数private int writeQueueNums;//读写权限private int perm;//Topic同步标记private int topicSysFlag;../后续省略
org.apache.rocketmq.common.protocol.route.BrokerData类结构
public class BrokerData implements Comparable<BrokerData> {//集群名称private String cluster;//Broker名称private String brokerName;//Broker地址列表,brokerId=0表示Master,大于0表示Slaveprivate HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;//根据随机数选取Broker地址private final Random random = new Random();../后续省略
org.apache.rocketmq.namesrv.routeinfo.BrokerLiveInfo类结构
class BrokerLiveInfo {//最新时间戳private long lastUpdateTimestamp;//数据版本号,用于识别主从选取的版本。跟主从同步相关private DataVersion dataVersion;//通道private Channel channel;//主节点地址private String haServerAddr;../后续省略
路由注册
RocketMQ路由注册是通过Broker与NameServer的心跳功能实现的。跟Nacos的注册中心AP模式稍有不同的是Nacos中有注册,也有心跳续约,它俩逻辑类似(注册时:发现注册过就更新时间戳。心跳续约:发现被剔除就重新注册)。而RocketMQ只有心跳功能(未注册即注册,已注册更新时间戳)。另有不同的是Nacos的client只选择某一个server进行注册,由server异步同步到其他server节点,使用的是http请求。而RocketMQ是同时给集群的所有NameServer发送心跳包,NameServer的集群节点之间不通信,无状态。且使用的是socket 请求,Broker与NameServer保持长连接。
Broker启动时给NameServer集群的所有节点发送心跳,后每隔30s发一次心跳包来进行心跳续约。NameServer收到心跳包后更新BrokerLiveTable的最新时间戳,同时每隔10s扫描一次BrokerLiveTable。超过120s后剔除失效的Broker,同时关闭Socket连接。
Broker发送心跳包
1):代码执行
Broker的启动类为:org.apache.rocketmq.broker.BrokerStartup。通过启动过程我们发现,Broker发送心跳包是通过一个定时线程任务来执行。
if (!messageStoreConfig.isEnableDLegerCommitLog()) {startProcessorByHa(messageStoreConfig.getBrokerRole());handleSlaveSynchronize(messageStoreConfig.getBrokerRole());//开启Broker注册全部this.registerBrokerAll(true, false, true);}//定时线程任务,10s后执行,30s发送一次this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {//开启Broker注册全部BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
BrokerController.this.registerBrokerAll(true, false, true);
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {../部分代码省略//判断是否需要注册if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),//注册超时时间:默认6sthis.brokerConfig.getRegisterBrokerTimeoutMills())) {//执行注册逻辑doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);}}
doRegisterBrokerAll(true,false,true);
//调用brokerOuterAPI对所有NameServer集群进行注册,并返回注册的结果集List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.getHAServerAddr(),topicConfigWrapper,this.filterServerManager.buildNewFilterServerList(),oneway,this.brokerConfig.getRegisterBrokerTimeoutMills(),this.brokerConfig.isCompressedRegister());../部分代码省略
this.brokerOuterAPI.registerBrokerAll
//创建结果集集合,用来存储注册的结果final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();//获取需要进行注册的NameServer集合List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();//判断NameServer地址集合是否数量大于0if (nameServerAddressList != null && nameServerAddressList.size() > 0) {//设置注册Broker请求头final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();//Broker地址requestHeader.setBrokerAddr(brokerAddr);//BrokerIdrequestHeader.setBrokerId(brokerId);//Broker名称requestHeader.setBrokerName(brokerName);//集群名称requestHeader.setClusterName(clusterName);//集群master的地址requestHeader.setHaServerAddr(haServerAddr);//暂时不知道requestHeader.setCompressed(compressed);//注册Broker体RegisterBrokerBody requestBody = new RegisterBrokerBody();//设置目前的Topic主题配置requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);//设置过滤服务列表requestBody.setFilterServerList(filterServerList);//转码final byte[] body = requestBody.encode(compressed);//转crc32码final int bodyCrc32 = UtilAll.crc32(body);//在请求头中设置crc32体requestHeader.setBodyCrc32(bodyCrc32);//创建与NameServer相同数量的同步栅栏,目的是为了阻塞当前线程//,利用异步的方式注册所有NameServer之后才唤醒final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());//循环遍历NameServer地址集合进行注册for (final String namesrvAddr : nameServerAddressList) {//利用BrokerOuter线程池线程进行注册brokerOuterExecutor.execute(new Runnable() {@Overridepublic void run() {try {//注册并获取注册结果RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);if (result != null) {//结果集不为空的情况下,放入结果集集合registerBrokerResultList.add(result);}log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);} catch (Exception e) {log.warn("registerBroker Exception, {}", namesrvAddr, e);} finally {//同步栅栏+1,当同步栅栏的count数量等于创建数量时唤醒await线程countDownLatch.countDown();}}});}try {//同步栅栏进行阻塞,等待唤醒,或者超时自动唤醒。timeoutMills=6scountDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {}}
registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
//创建请求指令,RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);request.setBody(body);//判断是否是单方向发送(单方向表示只发,效率高,但是无法判定是否发送成功)if (oneway) {try {//使用oneway发送注册Broker请求this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);} catch (RemotingTooMuchRequestException e) {// Ignore}return null;}//使用异步发送Broker注册请求RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);//断言response不为空assert response != null;//判定response结果码为成功,返回结果集switch (response.getCode()) {case ResponseCode.SUCCESS: {RegisterBrokerResponseHeader responseHeader =(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);RegisterBrokerResult result = new RegisterBrokerResult();result.setMasterAddr(responseHeader.getMasterAddr());result.setHaServerAddr(responseHeader.getHaServerAddr());if (response.getBody() != null) {result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));}return result;}default:break;}
this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
//设置开始执行时间long beginStartTime = System.currentTimeMillis();//创建连接通道,若已创建直接获取final Channel channel = this.getAndCreateChannel(addr);if (channel != null && channel.isActive()) {try {//RocketMQ请求前处理doBeforeRpcHooks(addr, request);long costTime = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTime) {//请求前处理超时即抛异常throw new RemotingTimeoutException("invokeSync call timeout");}//请求处理(这里为注册请求)RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);//RocketMQ请求后处理doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);return response;}../部分代码省略}
this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
//这块要着重讲述一下,返回结果监听是由 org.apache.rocketmq.remoting.netty.NettyRemotingClient.NettyClientHandler类监听到channelRead0,然后判断是response指令后,putResponse和release唤醒CountDownLatch和SemaphoreReleaseOnlyOnce。
具体为什么这么设计还没有搞清楚,至少明白了一点responseTable通过put(opaque,responseFuture)后,再在通过获取NettyClientHandler的读事件时,利用opaque将结果指令放入responseFuture,同时唤醒responseFuture往下执行。
try {//创建相应结果futurefinal ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);this.responseTable.put(opaque, responseFuture);final SocketAddress addr = channel.remoteAddress();//利用通道发送request请求channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {//获取响应结果if (f.isSuccess()) {//成功responseFuture.setSendRequestOK(true);return;} else {//失败responseFuture.setSendRequestOK(false);}../省略部分代码}});//同步栅栏阻塞,直到超时或获取结果集唤醒RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);../省略部分代码//返回结果指令return responseCommand;}
2):逻辑总结
- 开启Broker注册所有NameServer的定时任务,30s一次
- 循环NameServer的地址列表,并利用线程池进行异步线程注册
- 注册使用的请求方式为Java NIO,也就是Netty封装的异步非阻塞IO(创建通道连接,并将数据写入缓存提交到Server端)
- 通过NettyClient监听读事件,将结果指令放入对应的的结果集中,并唤醒线程。
在异步线程执行时多次使用CountDownLatch(同步栅栏)来进行线程阻塞,等待结果执行完毕被唤醒。为了避免异常情况,还设置了超时机制。
NameServer处理心跳包
1):代码执行
我们都知道,RocketMQ使用的是Netty封装的异步非阻塞IO来做交互,所以NameServer走的是异步processor。看一下调用链,直接调用的是RouteInfoManager#registerBroker。

定位org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBrokertry {//加读写锁的原因是避免并发修改导致数据异常情况this.lock.writeLock().lockInterruptibly();//cluster地址集合Set<String> brokerNames = this.clusterAddrTable.get(clusterName);if (null == brokerNames) {brokerNames = new HashSet<String>();this.clusterAddrTable.put(clusterName, brokerNames);}//将Broker的名称放入set(非重复)集合brokerNames.add(brokerName);boolean registerFirst = false;//Broker基本信息集合BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());this.brokerAddrTable.put(brokerName, brokerData);}//Broker的地址集合Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();//判断Broker地址相同,但Broker的ID不同时,删除该item。//其目的是因为可能slave(分节点)变成了master(主节点),所以brokerId发生了改变//ip:port要唯一Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();while (it.hasNext()) {Entry<Long, String> item = it.next();if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {it.remove();}}//重新put数据String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);//若oldAddr!=null,说明是一次注册或者是brokerId发生了改变registerFirst = registerFirst || (null == oldAddr);//若是master节点if (null != topicConfigWrapper&& MixAll.MASTER_ID == brokerId) {//判断Broker主题配置是否改变或是否第一次注册if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())|| registerFirst) {//重新创建QueueData或更新QueueData(Broker的路由信息)ConcurrentMap<String, TopicConfig> tcTable =topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {this.createAndUpdateQueueData(brokerName, entry.getValue());}}}}//重新加载brokerAddr对应的心跳存活信息//直接put的好处就是新增跟更新一并搞定,坏处就是创新了一个全新的对象BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),topicConfigWrapper.getDataVersion(),channel,haServerAddr));//如果旧的心跳存活信息为空,说明是第一次创建if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);}//服务过滤集合if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddr);} else {this.filterServerTable.put(brokerAddr, filterServerList);}}//如果是从节点,将主节点的broker地址赋值给当前resultif (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);if (brokerLiveInfo != null) {result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}} finally {//解锁this.lock.writeLock().unlock();}
2):逻辑总结
接收Broker发送来的请求,判断是注册请求后转到RouteInfoManager#registerBroker。
- 注册时判断更新了多个存储集合clusterAddrTable(Broker集群信息,存储集群中所有的brokerName),brokerAddrTable(Broker基本信息集合),topicQueueTable(Topic队列路由信息集合,消息发送时根据路由表进行负载均衡),brokerLiveTable(Broker状态信息集合),filterServerTable(类模式消息过滤(4.4版本后废弃))。注:broker的心跳信息直接是覆盖操作,估计是考虑到所有信息都会发生改变的情况,不像Nacos判断在心跳续约时间范围内,直接更新lastUpdateTimestamp。
- 返回RegisterBrokerResult。
- 注:在更新数据时加上了Lock锁,避免并发时造成数据异常。
