源码搭建

源码地址:https://github.com/apache/rocketmq
关键模块

  1. broker :broke 启动进程
  2. client:客户端,包括生产者消费者
  3. example:示例代码
  4. namesrv:nameServer
  5. store:消息存储实现相关

其他在docs文件有相关Junit测试代码,重要

带注解的源码:
rocketmq-all-4.7.1-source-release-带注释.zip

调试时,先在项目目录下创建一个conf目录,
并从distribution拷贝broker.conf和logback_broker.xml和logback_namesrv.xml
1654958823641.png

1:nameServ启动

配置环境变量
1654958916744.png
image.png

核心问题:

  1. 维护broker服务地址以及及时更新
  2. 给producer,consumer 提供获取broker列表

流程

image.png

重要类:

  1. NamesrvController
    类似
    web中的conroller,相应客户端请求的

  2. NamesrvConfig

  3. NettyServerConfig

1654959829873.png

2:broker启动

配置broker.conf文件
配置对应的文件路径

brokerClusterName = DefaultCluster

brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH

自动创建Topic

autoCreateTopicEnable=true

nameServ地址

namesrvAddr=127.0.0.1:9876

存储路径

storePathRootDir=E:\RocketMQ\data\rocketmq\dataDir

commitLog路径

storePathCommitLog=E:\RocketMQ\data\rocketmq\dataDir\commitlog

消息队列存储路径

storePathConsumeQueue=E:\RocketMQ\data\rocketmq\dataDir\consumequeue

消息索引存储路径

storePathIndex=E:\RocketMQ\data\rocketmq\dataDir\index

checkpoint文件路径

storeCheckpoint=E:\RocketMQ\data\rocketmq\dataDir\checkpoint

abort文件存储路径

abortFile=E:\RocketMQ\data\rocketmq\dataDir\abort

启动类:BrokerStartup

启动方法:
BrokerStartup.createBrokerController
管理类:

  1. BrokerController

Broker核心配置:
BrokerConfig
NettyServerConfig :Netty服务端占用了10911端口。
NettyClientConfig
MessageStoreConfig

  1. this.messageStore.start();启动核心的消息存储组件
  2. this.remotingServer.start();
  3. this.fastRemotingServer.start(); 启动两个Netty服务
  4. this.brokerOuterAPI.start();启动客户端,往外发请求
  5. BrokerController.this.registerBrokerAll NameServer注册心跳。
  6. this.brokerStatsManager.start();
  7. this.brokerFastFailure.start();这也是一些负责具体业务的功能组件

Broker架构图

1655023310801.png

3:broker注册

方法:

BrokerController.this.registerBrokerAll

  1. public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
  2. //Topic配置相关的东东
  3. TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
  4. if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
  5. || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
  6. ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
  7. for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
  8. TopicConfig tmp =
  9. new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
  10. this.brokerConfig.getBrokerPermission());
  11. topicConfigTable.put(topicConfig.getTopicName(), tmp);
  12. }
  13. topicConfigWrapper.setTopicConfigTable(topicConfigTable);
  14. }
  15. //这里才是比较关键的地方。先判断是否需要注册,然后调用doRegisterBrokerAll方法真正去注册。
  16. if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
  17. this.getBrokerAddr(),
  18. this.brokerConfig.getBrokerName(),
  19. this.brokerConfig.getBrokerId(),
  20. this.brokerConfig.getRegisterBrokerTimeoutMills())) {
  21. doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
  22. }
  23. }

image.png

4:producer

注意Producer有两种。 一个是普通发送者。这个只需要构建一个Netty客户端。

  1. 普通的发送者DefaultMQProducer
  2. 事务的发送者TransactionMQProducer
    1. public void start() throws MQClientException {
    2. this.setProducerGroup(withNamespace(this.producerGroup));
    3. this.defaultMQProducerImpl.start();
    4. if (null != traceDispatcher) {
    5. try {
    6. traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
    7. } catch (MQClientException e) {
    8. log.warn("trace dispatcher start failed ", e);
    9. }
    10. }
    11. }

主线流程

  1. produccer需要拉去broker列表
  2. 并不建立连接,

Send方法:
先获取topic路由,可从本地缓存区,没有再去NameServer申请
然后选取一个MessageQueue,轮训采用取模方式
最后netty请求发送消息,到broker后,由commitlog写入commitlog文件

image.png

  1. //Producer选择MessageQueue的方法
  2. public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
  3. //这个sendLatencyFaultEnable默认是关闭的,Broker故障延迟机制,表示一种发送消息失败后一定时间内不在往同一个Queue重复发送的机制
  4. if (this.sendLatencyFaultEnable) {
  5. try {
  6. //K2 这里可以看到,Producer选择MessageQueue的方法就是自增,然后取模。并且只有这一种方法。
  7. int index = tpInfo.getSendWhichQueue().getAndIncrement();
  8. for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
  9. int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
  10. if (pos < 0)
  11. pos = 0;
  12. MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
  13. //Broker轮询。尽量将请求平均分配给不同的Broker
  14. if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
  15. if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
  16. return mq;
  17. }
  18. }
  19. final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
  20. int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
  21. if (writeQueueNums > 0) {
  22. //这里计算也还是自增取模
  23. final MessageQueue mq = tpInfo.selectOneMessageQueue();
  24. if (notBestBroker != null) {
  25. mq.setBrokerName(notBestBroker);
  26. mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
  27. }
  28. return mq;
  29. } else {
  30. latencyFaultTolerance.remove(notBestBroker);
  31. }
  32. } catch (Exception e) {
  33. log.error("Error occurred when selecting message queue", e);
  34. }
  35. return tpInfo.selectOneMessageQueue();
  36. }
  37. return tpInfo.selectOneMessageQueue(lastBrokerName);
  38. }


5:消息存储

方法入口: DefaultMessageStore.putMessage
最终存储的文件:

  1. commitLog:消息存储目录
    broker写入消息的实际接口,
    会把消息追加到MappedFile内存,不是直接写入磁盘
    串行化的
  2. config:运行期间一些配置信息
  3. consumerqueue:消息消费队列存储目录
    commitlog写入后,后台线程reputMessageService会拉去commitlog的最新消息,
    分别转发到ComsumeQueue和indexFile
    如果宕机,会存在不一致问题
    DefaultMappedStore #load提供恢复方法

文件同步刷盘,异步刷盘
入口CommitLog.putMessage -> CommitLog.handleDiskFlush
是否启动对外内存:TransientStorePoolEnable
如果开启会申请一个commitlog大小文件一致的内存

  1. index:消息索引文件存储目录
  2. abort:如果存在改文件寿命Broker非正常关闭
    过期文件删除
    入口:
    DefaultMessageStore.addScheduleTask -> DefaultMessageStore.this.cleanFilesPeriodically()
    会检查commitlog,comsumerQueue,超过72小时会删除
    image.png
  3. checkpoint:文件检查点,存储CommitLog文件最后一次刷盘时间戳、consumerquueue最后一次刷盘时间,index索引文件最后一次刷盘时间戳。

RocketMQ文件包括:

  1. commitlog:消息文件
  2. comsumerQueue:消息消费队列文件
  3. IndexFile:hash索引文件
  4. CheckPoint:检测点文件
  5. abort:关闭异常文件

MappedFile:涉及零拷贝的实现

6:消费者

集群模式
广播模式
推模式:DefaultMQPushConsumerImpl
拉模式
消息顺序:只支持一个队列上的局部消息顺序
ConsumeMessageOrderlyService 使用加锁机制保证
RebalanceImpl:负载均衡,

启动:

DefaultMQPushConsumer.start方法
DefaultMQPushConsumerImpl
客户断mQClientFactory 主要是启动了一大堆的服务

消息拉取

拉模式:PullMessageService
messageQueue:负责拉取消息,
processQueue:拉取的消息存在processQueue
image.png
长轮训拉取机制
配置项longPollingEnable=true开启长轮训模式
PullMessageProcessor#processRequest()

  1. case ResponseCode.PULL_NOT_FOUND:
  2. if (brokerAllowSuspend && hasSuspendFlag) {
  3. long pollingTimeMills = suspendTimeoutMillisLong;
  4. //K2 消息长轮询
  5. if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
  6. pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
  7. }
  8. String topic = requestHeader.getTopic();
  9. long offset = requestHeader.getQueueOffset();
  10. int queueId = requestHeader.getQueueId();
  11. //没有拉取到消息,就再创建一个拉取请求
  12. PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
  13. this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
  14. //将请求放入ManyRequestPull请求队列
  15. this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
  16. response = null;
  17. break;
  18. }

PullRequestHoldService会每个五秒查看是否有新消息,
NotifyMessageArrivingListener监听机制:

  1. if (dispatchRequest.isSuccess()) {
  2. if (size > 0) {
  3. //分发CommitLog写入消息
  4. DefaultMessageStore.this.doDispatch(dispatchRequest);
  5. //K2 长轮询: 如果有消息到了主节点,并且开启了长轮询。
  6. if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
  7. && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
  8. //唤醒NotifyMessageArrivingListener的arriving方法,进行一次请求线程的检查
  9. DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
  10. dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
  11. dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
  12. dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
  13. }

负责均衡

RebalanceService 负责客户端负责聚恒
RebalanceImpl
consumer的allocateMessageQueueStrategy属性来选择。

五种策略:

  1. AllocateMessageQueueAveragely,最常用,平均分
  2. AllocateMessageQueueAveragelyByCircle平均轮训分配
    是把MessageQueue按照组内消费平均轮训

7:延迟消息

入口scheduleMessageService
start方法 有个CAS锁,只有一个线程进行消息搬运
可以在这个地方进行改写
image.png
写入的消息会转入SCHEDULE_TOPIC_XXXX这个Topic
默认是18个级别的队列
入口:CommitLog.putMessage
ScheduleMessageService会每隔1秒钟执行一个executeOnTimeup任务
将消息从延迟队列中写入正常Topic中
入口:ScheduleMessageService#DeliverDelayedMessageTimerTask.executeOnTimeup

image.png
ConsumeMessageOrderlyService.