文件的存储路径配置

broker.properties

  1. #存储路径
  2. storePathRootDir=/app/rocketmq/store
  3. #commitLog 存储路径
  4. storePathCommitLog=/app/rocketmq/store/commitlog
  5. #消费队列存储路径存储路径
  6. storePathConsumeQueue=/app/rocketmq/store/consumequeue
  7. #消息索引存储路径
  8. storePathIndex=/app/rocketmq/store/index
  9. #checkpoint 文件存储路径
  10. storeCheckpoint=/app/rocketmq/store/checkpoint
  11. #abort 文件存储路径
  12. abortFile=/app/rocketmq/store/abort

RocketMQ消息的存储分为三个部分:

CommitLog

CommitLog:存储消息的元数据。所有Producer发过来的消息都以最快的时间会顺序存入到CommitLog文件当中。

你发过来的消息有不同的topic,QueueId Message ,不管你发的消息的topic是什么,MQ就以最快的时间给消息存储到CommitLog里面

CommitLog由多个文件组成,每个文件固定大小1G。以第一条消息的偏移量为文件名。

我去CommitLog文件里面找数据怎么找呢?我要找一个topic的消息我怎么找?CommitLog把消息保存之后会把消息分发到两个索引文件,一个索引文件是ConsumerQueue,还一个索引文件是IndexFile文件

CommitLog的文件名就是消息的偏移量,大小是固定的,就是 1073741824 ,这个就是1G

如果00000000000000000000文件存不下了,就会立即新建一个同样大小的文件,文件名直接采用新文件里面第一条消息的索引值作为文件名字.比如说可能是000000000000000010000 ,就表示这个文件是从第一万个消息开始存的

  1. [root@zjj102 commitlog]# ll
  2. 总用量 466152
  3. -rw-r--r--. 1 root root 1073741824 10 25 21:26 00000000000000000000
  4. [root@zjj102 commitlog]#

ConsumerQueue

ConsumerQueue:存储消息在CommitLog的索引。一个MessageQueue一个文件,记录当前
MessageQueue被哪些消费者组消费到了哪一条CommitLog,就是一个队列对消息消费的进度.

为了保证效率 ConsumerQueue是不存消息内存的,只是存CommitLog文件的消息索引.这样我通过ConsumerQueue就能快速的找到消息在CommitLog里面的哪个内容了.

ConsumerQueue文件也给tag做了个过滤,保存了tag的索引,如果我们使用tag来进行消息过滤,速度也很快的.所以阿里官方建议我们过滤消息的时候要用tag来过滤,因为tag过滤效率是非常高的,直接基于RocketMQ的底层ConsumerQueue文件进行过滤消息.

  1. # 进入到consumequeue文件夹下,里面有多个文件夹,文件夹名就是topic
  2. [root@zjj102 consumequeue]# ls
  3. BatchTest RMQ_SYS_TRANS_HALF_TOPIC TagFilterTest
  4. OFFSET_MOVED_EVENT RMQ_SYS_TRANS_OP_HALF_TOPIC TopicTest
  5. OrderTopicTest SCHEDULE_TOPIC_XXXX TopicTest2
  6. RMQ_SYS_TRACE_TOPIC SqlFilterTest TRANS_CHECK_MAX_TIME_TOPIC
  7. # 进入到BatchTest这个topic文件夹下
  8. [root@zjj102 consumequeue]# cd BatchTest/
  9. # 里面是这个topic下的队列文件夹,每个文件夹就是队列名字
  10. [root@zjj102 BatchTest]# ls
  11. 0 1 2 3
  12. # 进入0号队列名字,里面就是这个0号队列的文件了,也是二进制文件,这个二进制文件大小是固定的,文件名就是文件的第一个内容的偏移量
  13. [root@zjj102 BatchTest]# cd 0/
  14. [root@zjj102 0]# ls
  15. 00000000000000000000 00000000000006000000 00000000000012000000

IndexFile

IndexFile:这个文件是在ConsumerQueue文件的基础上给我们提供了一些辅助的索引功能.比如说基于时间戳过滤,

为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程

IndexFile文件可以支持Key Hash搜索, TimeStamp搜索等等

  1. //使用指定的消费者组名称实例化
  2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("defaultGroup");
  3. consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  4. //从时间戳去读消息
  5. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
  6. // 通过setConsumeTimestamp方法指定时间点,然后通过这个时间点往后面去读
  7. String consumeTimestamp = "起始时间戳";
  8. consumer.setConsumeTimestamp(consumeTimestamp );

IndexFile文件命名和CommitLog文件类似,就是每个文件是固定的大小,只不过文件名是时间戳的格式

  1. [root@zjj102 index]# ls
  2. 20211017125813343

这三个文件都是二进制文件

其它了解不重要的文件

abort

这个文件是RocketMQ用来判断程序是否正常关闭的一个标识文件。正常情况下,会在启动时创建,而关闭服务时删除。但是如果遇到一些服务器宕机,或者kill -9这样一些非正常关闭服务的情况,这个abort文件就不会删除,因此RocketMQ就可以判断上一次服务是非正常关闭的,后续就会做一些数据恢复的操作。

数据恢复操作比如说CommitLog把消息分发到ConsumerQueue文件IndexFile文件等等,由于你上次是 非正常关闭的,可能就出现任务分发到一半儿,RocketMQ进程就停了,此时启动RocketMQ之后,重新进行这些任务等等

checkpoint

数据存盘检查点

config/*.json

这些文件是将RocketMQ的一些关键配置信息进行存盘保存。例如Topic配置、消费者组配置、消费者组消息偏移量Offset 等等一些信息。是json格式存储的,程序员可以打开这个文件进行查看.

很多RocketMQ监控软件都是读取config/*.json文件来获取MQ队列信息的.

整体的消息存储结构如下图:

RocketMQ的消息存储结构 - 图1