源码搭建
源码地址:https://github.com/apache/rocketmq
关键模块
- broker :broke 启动进程
- client:客户端,包括生产者消费者
- example:示例代码
- namesrv:nameServer
- store:消息存储实现相关
其他在docs文件有相关Junit测试代码,重要
带注解的源码:
rocketmq-all-4.7.1-source-release-带注释.zip
调试时,先在项目目录下创建一个conf目录,
并从distribution拷贝broker.conf和logback_broker.xml和logback_namesrv.xml
1:nameServ启动
配置环境变量
核心问题:
- 维护broker服务地址以及及时更新
- 给producer,consumer 提供获取broker列表
流程
重要类:
NamesrvController
类似web中的conroller,相应客户端请求的NamesrvConfig
- NettyServerConfig
2:broker启动
配置broker.conf文件
配置对应的文件路径
brokerClusterName = DefaultCluster
brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH
自动创建Topic
autoCreateTopicEnable=true
nameServ地址
namesrvAddr=127.0.0.1:9876
存储路径
storePathRootDir=E:\RocketMQ\data\rocketmq\dataDir
commitLog路径
storePathCommitLog=E:\RocketMQ\data\rocketmq\dataDir\commitlog
消息队列存储路径
storePathConsumeQueue=E:\RocketMQ\data\rocketmq\dataDir\consumequeue
消息索引存储路径
storePathIndex=E:\RocketMQ\data\rocketmq\dataDir\index
checkpoint文件路径
storeCheckpoint=E:\RocketMQ\data\rocketmq\dataDir\checkpoint
abort文件存储路径
abortFile=E:\RocketMQ\data\rocketmq\dataDir\abort
启动类:BrokerStartup
启动方法:
BrokerStartup.createBrokerController
管理类:
- BrokerController
Broker核心配置:
BrokerConfig
NettyServerConfig :Netty服务端占用了10911端口。
NettyClientConfig
MessageStoreConfig
this.messageStore.start();启动核心的消息存储组件
this.remotingServer.start();
this.fastRemotingServer.start(); 启动两个Netty服务
this.brokerOuterAPI.start();启动客户端,往外发请求
BrokerController.this.registerBrokerAll: 向NameServer注册心跳。
this.brokerStatsManager.start();
this.brokerFastFailure.start();这也是一些负责具体业务的功能组件
Broker架构图
3:broker注册
方法:
BrokerController.this.registerBrokerAll
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
//Topic配置相关的东东
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
//这里才是比较关键的地方。先判断是否需要注册,然后调用doRegisterBrokerAll方法真正去注册。
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
4:producer
注意Producer有两种。 一个是普通发送者。这个只需要构建一个Netty客户端。
- 普通的发送者DefaultMQProducer
- 事务的发送者TransactionMQProducer
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
主线流程
- produccer需要拉去broker列表
- 并不建立连接,
Send方法:
先获取topic路由,可从本地缓存区,没有再去NameServer申请
然后选取一个MessageQueue,轮训采用取模方式
最后netty请求发送消息,到broker后,由commitlog写入commitlog文件
//Producer选择MessageQueue的方法
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//这个sendLatencyFaultEnable默认是关闭的,Broker故障延迟机制,表示一种发送消息失败后一定时间内不在往同一个Queue重复发送的机制
if (this.sendLatencyFaultEnable) {
try {
//K2 这里可以看到,Producer选择MessageQueue的方法就是自增,然后取模。并且只有这一种方法。
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
//Broker轮询。尽量将请求平均分配给不同的Broker
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
//这里计算也还是自增取模
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
5:消息存储
方法入口: DefaultMessageStore.putMessage
最终存储的文件:
- commitLog:消息存储目录
broker写入消息的实际接口,
会把消息追加到MappedFile内存,不是直接写入磁盘
串行化的 - config:运行期间一些配置信息
- consumerqueue:消息消费队列存储目录
commitlog写入后,后台线程reputMessageService会拉去commitlog的最新消息,
分别转发到ComsumeQueue和indexFile
如果宕机,会存在不一致问题
DefaultMappedStore #load提供恢复方法
文件同步刷盘,异步刷盘
入口CommitLog.putMessage -> CommitLog.handleDiskFlush
是否启动对外内存:TransientStorePoolEnable
如果开启会申请一个commitlog大小文件一致的内存
- index:消息索引文件存储目录
- abort:如果存在改文件寿命Broker非正常关闭
过期文件删除
入口:
DefaultMessageStore.addScheduleTask -> DefaultMessageStore.this.cleanFilesPeriodically()
会检查commitlog,comsumerQueue,超过72小时会删除 - checkpoint:文件检查点,存储CommitLog文件最后一次刷盘时间戳、consumerquueue最后一次刷盘时间,index索引文件最后一次刷盘时间戳。
RocketMQ文件包括:
- commitlog:消息文件
- comsumerQueue:消息消费队列文件
- IndexFile:hash索引文件
- CheckPoint:检测点文件
- abort:关闭异常文件
MappedFile:涉及零拷贝的实现
6:消费者
集群模式
广播模式
推模式:DefaultMQPushConsumerImpl
拉模式
消息顺序:只支持一个队列上的局部消息顺序
ConsumeMessageOrderlyService 使用加锁机制保证
RebalanceImpl:负载均衡,
启动:
DefaultMQPushConsumer.start方法
DefaultMQPushConsumerImpl
客户断mQClientFactory 主要是启动了一大堆的服务
消息拉取
拉模式:PullMessageService
messageQueue:负责拉取消息,
processQueue:拉取的消息存在processQueue
长轮训拉取机制
配置项longPollingEnable=true开启长轮训模式
PullMessageProcessor#processRequest()
case ResponseCode.PULL_NOT_FOUND:
if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
//K2 消息长轮询
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
//没有拉取到消息,就再创建一个拉取请求
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
//将请求放入ManyRequestPull请求队列
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
PullRequestHoldService会每个五秒查看是否有新消息,
NotifyMessageArrivingListener监听机制:
if (dispatchRequest.isSuccess()) {
if (size > 0) {
//分发CommitLog写入消息
DefaultMessageStore.this.doDispatch(dispatchRequest);
//K2 长轮询: 如果有消息到了主节点,并且开启了长轮询。
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
//唤醒NotifyMessageArrivingListener的arriving方法,进行一次请求线程的检查
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
负责均衡
RebalanceService 负责客户端负责聚恒
RebalanceImpl
consumer的allocateMessageQueueStrategy属性来选择。
五种策略:
- AllocateMessageQueueAveragely,最常用,平均分
- AllocateMessageQueueAveragelyByCircle平均轮训分配
是把MessageQueue按照组内消费平均轮训
7:延迟消息
入口scheduleMessageService
start方法 有个CAS锁,只有一个线程进行消息搬运
可以在这个地方进行改写
写入的消息会转入SCHEDULE_TOPIC_XXXX这个Topic
默认是18个级别的队列
入口:CommitLog.putMessage
ScheduleMessageService会每隔1秒钟执行一个executeOnTimeup任务
将消息从延迟队列中写入正常Topic中
入口:ScheduleMessageService#DeliverDelayedMessageTimerTask.executeOnTimeup
ConsumeMessageOrderlyService.