NameServer作为RocketMQ的管理“大脑”,主要为Producer和Consumer提供Broker的信息。它能够管理Broker的各个节点,保存元数据信息。可以让Broker进行路由注册,心跳续约,也可以剔除Broker。
路由元信息
Broker在NameServer中存储的信息,实现类为:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager。
//Broker超时时间:默认120s
long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
//读写锁,注册时避免并发
ReadWriteLock lock = new ReentrantReadWriteLock();
//Topic队列路由信息集合,消息发送时根据路由表进行负载均衡
HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//Broker基本信息集合
HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//Broker集群信息,存储集群中所有的brokerName
HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//Broker状态信息,NameServer根据brokerLiveTalbe的时间戳来判断是否剔除该Broker
HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
//类模式消息过滤(4.4版本后废弃)
HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
org.apache.rocketmq.common.protocol.route.QueueData类结构
public class QueueData implements Comparable<QueueData> {
//Broker的名称
private String brokerName;
//读取队列数
private int readQueueNums;
//写入队列数
private int writeQueueNums;
//读写权限
private int perm;
//Topic同步标记
private int topicSysFlag;
../后续省略
org.apache.rocketmq.common.protocol.route.BrokerData类结构
public class BrokerData implements Comparable<BrokerData> {
//集群名称
private String cluster;
//Broker名称
private String brokerName;
//Broker地址列表,brokerId=0表示Master,大于0表示Slave
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
//根据随机数选取Broker地址
private final Random random = new Random();
../后续省略
org.apache.rocketmq.namesrv.routeinfo.BrokerLiveInfo类结构
class BrokerLiveInfo {
//最新时间戳
private long lastUpdateTimestamp;
//数据版本号,用于识别主从选取的版本。跟主从同步相关
private DataVersion dataVersion;
//通道
private Channel channel;
//主节点地址
private String haServerAddr;
../后续省略
路由注册
RocketMQ路由注册是通过Broker与NameServer的心跳功能实现的。跟Nacos的注册中心AP模式稍有不同的是Nacos中有注册,也有心跳续约,它俩逻辑类似(注册时:发现注册过就更新时间戳。心跳续约:发现被剔除就重新注册)。而RocketMQ只有心跳功能(未注册即注册,已注册更新时间戳)。另有不同的是Nacos的client只选择某一个server进行注册,由server异步同步到其他server节点,使用的是http请求。而RocketMQ是同时给集群的所有NameServer发送心跳包,NameServer的集群节点之间不通信,无状态。且使用的是socket 请求,Broker与NameServer保持长连接。
Broker启动时给NameServer集群的所有节点发送心跳,后每隔30s发一次心跳包来进行心跳续约。NameServer收到心跳包后更新BrokerLiveTable的最新时间戳,同时每隔10s扫描一次BrokerLiveTable。超过120s后剔除失效的Broker,同时关闭Socket连接。
Broker发送心跳包
1):代码执行
Broker的启动类为:org.apache.rocketmq.broker.BrokerStartup。通过启动过程我们发现,Broker发送心跳包是通过一个定时线程任务来执行。
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
//开启Broker注册全部
this.registerBrokerAll(true, false, true);
}
//定时线程任务,10s后执行,30s发送一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//开启Broker注册全部
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
BrokerController.this.registerBrokerAll(true, false, true);
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
../部分代码省略
//判断是否需要注册
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
//注册超时时间:默认6s
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
//执行注册逻辑
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
doRegisterBrokerAll(true,false,true);
//调用brokerOuterAPI对所有NameServer集群进行注册,并返回注册的结果集
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.getHAServerAddr(),
topicConfigWrapper,
this.filterServerManager.buildNewFilterServerList(),
oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isCompressedRegister());
../部分代码省略
this.brokerOuterAPI.registerBrokerAll
//创建结果集集合,用来存储注册的结果
final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
//获取需要进行注册的NameServer集合
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
//判断NameServer地址集合是否数量大于0
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
//设置注册Broker请求头
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
//Broker地址
requestHeader.setBrokerAddr(brokerAddr);
//BrokerId
requestHeader.setBrokerId(brokerId);
//Broker名称
requestHeader.setBrokerName(brokerName);
//集群名称
requestHeader.setClusterName(clusterName);
//集群master的地址
requestHeader.setHaServerAddr(haServerAddr);
//暂时不知道
requestHeader.setCompressed(compressed);
//注册Broker体
RegisterBrokerBody requestBody = new RegisterBrokerBody();
//设置目前的Topic主题配置
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
//设置过滤服务列表
requestBody.setFilterServerList(filterServerList);
//转码
final byte[] body = requestBody.encode(compressed);
//转crc32码
final int bodyCrc32 = UtilAll.crc32(body);
//在请求头中设置crc32体
requestHeader.setBodyCrc32(bodyCrc32);
//创建与NameServer相同数量的同步栅栏,目的是为了阻塞当前线程
//,利用异步的方式注册所有NameServer之后才唤醒
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
//循环遍历NameServer地址集合进行注册
for (final String namesrvAddr : nameServerAddressList) {
//利用BrokerOuter线程池线程进行注册
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
//注册并获取注册结果
RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
if (result != null) {
//结果集不为空的情况下,放入结果集集合
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
//同步栅栏+1,当同步栅栏的count数量等于创建数量时唤醒await线程
countDownLatch.countDown();
}
}
});
}
try {
//同步栅栏进行阻塞,等待唤醒,或者超时自动唤醒。timeoutMills=6s
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
//创建请求指令,
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
request.setBody(body);
//判断是否是单方向发送(单方向表示只发,效率高,但是无法判定是否发送成功)
if (oneway) {
try {
//使用oneway发送注册Broker请求
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) {
// Ignore
}
return null;
}
//使用异步发送Broker注册请求
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
//断言response不为空
assert response != null;
//判定response结果码为成功,返回结果集
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
RegisterBrokerResponseHeader responseHeader =
(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
RegisterBrokerResult result = new RegisterBrokerResult();
result.setMasterAddr(responseHeader.getMasterAddr());
result.setHaServerAddr(responseHeader.getHaServerAddr());
if (response.getBody() != null) {
result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
}
return result;
}
default:
break;
}
this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
//设置开始执行时间
long beginStartTime = System.currentTimeMillis();
//创建连接通道,若已创建直接获取
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
//RocketMQ请求前处理
doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
//请求前处理超时即抛异常
throw new RemotingTimeoutException("invokeSync call timeout");
}
//请求处理(这里为注册请求)
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
//RocketMQ请求后处理
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
return response;
}
../部分代码省略
}
this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
//这块要着重讲述一下,返回结果监听是由 org.apache.rocketmq.remoting.netty.NettyRemotingClient.NettyClientHandler类监听到channelRead0,然后判断是response指令后,putResponse和release唤醒CountDownLatch和SemaphoreReleaseOnlyOnce。
具体为什么这么设计还没有搞清楚,至少明白了一点responseTable通过put(opaque,responseFuture)后,再在通过获取NettyClientHandler的读事件时,利用opaque将结果指令放入responseFuture,同时唤醒responseFuture往下执行。
try {
//创建相应结果future
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
//利用通道发送request请求
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
//获取响应结果
if (f.isSuccess()) {
//成功
responseFuture.setSendRequestOK(true);
return;
} else {
//失败
responseFuture.setSendRequestOK(false);
}
../省略部分代码
}
});
//同步栅栏阻塞,直到超时或获取结果集唤醒
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
../省略部分代码
//返回结果指令
return responseCommand;
}
2):逻辑总结
- 开启Broker注册所有NameServer的定时任务,30s一次
- 循环NameServer的地址列表,并利用线程池进行异步线程注册
- 注册使用的请求方式为Java NIO,也就是Netty封装的异步非阻塞IO(创建通道连接,并将数据写入缓存提交到Server端)
- 通过NettyClient监听读事件,将结果指令放入对应的的结果集中,并唤醒线程。
在异步线程执行时多次使用CountDownLatch(同步栅栏)来进行线程阻塞,等待结果执行完毕被唤醒。为了避免异常情况,还设置了超时机制。
NameServer处理心跳包
1):代码执行
我们都知道,RocketMQ使用的是Netty封装的异步非阻塞IO来做交互,所以NameServer走的是异步processor。看一下调用链,直接调用的是RouteInfoManager#registerBroker。
定位org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBrokertry {
//加读写锁的原因是避免并发修改导致数据异常情况
this.lock.writeLock().lockInterruptibly();
//cluster地址集合
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
//将Broker的名称放入set(非重复)集合
brokerNames.add(brokerName);
boolean registerFirst = false;
//Broker基本信息集合
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
//Broker的地址集合
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//判断Broker地址相同,但Broker的ID不同时,删除该item。
//其目的是因为可能slave(分节点)变成了master(主节点),所以brokerId发生了改变
//ip:port要唯一
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
//重新put数据
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
//若oldAddr!=null,说明是一次注册或者是brokerId发生了改变
registerFirst = registerFirst || (null == oldAddr);
//若是master节点
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
//判断Broker主题配置是否改变或是否第一次注册
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
//重新创建QueueData或更新QueueData(Broker的路由信息)
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
//重新加载brokerAddr对应的心跳存活信息
//直接put的好处就是新增跟更新一并搞定,坏处就是创新了一个全新的对象
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
//如果旧的心跳存活信息为空,说明是第一次创建
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
//服务过滤集合
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
//如果是从节点,将主节点的broker地址赋值给当前result
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
//解锁
this.lock.writeLock().unlock();
}
2):逻辑总结
接收Broker发送来的请求,判断是注册请求后转到RouteInfoManager#registerBroker。
- 注册时判断更新了多个存储集合clusterAddrTable(Broker集群信息,存储集群中所有的brokerName),brokerAddrTable(Broker基本信息集合),topicQueueTable(Topic队列路由信息集合,消息发送时根据路由表进行负载均衡),brokerLiveTable(Broker状态信息集合),filterServerTable(类模式消息过滤(4.4版本后废弃))。注:broker的心跳信息直接是覆盖操作,估计是考虑到所有信息都会发生改变的情况,不像Nacos判断在心跳续约时间范围内,直接更新lastUpdateTimestamp。
- 返回RegisterBrokerResult。
- 注:在更新数据时加上了Lock锁,避免并发时造成数据异常。