文件的存储路径配置
broker.properties
#存储路径
storePathRootDir=/app/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/app/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/app/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/app/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/app/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/app/rocketmq/store/abort
RocketMQ消息的存储分为三个部分:
CommitLog
CommitLog:存储消息的元数据。所有Producer发过来的消息都以最快的时间会顺序存入到CommitLog文件当中。
你发过来的消息有不同的topic,QueueId Message ,不管你发的消息的topic是什么,MQ就以最快的时间给消息存储到CommitLog里面
CommitLog由多个文件组成,每个文件固定大小1G。以第一条消息的偏移量为文件名。
我去CommitLog文件里面找数据怎么找呢?我要找一个topic的消息我怎么找?CommitLog把消息保存之后会把消息分发到两个索引文件,一个索引文件是ConsumerQueue,还一个索引文件是IndexFile文件
CommitLog的文件名就是消息的偏移量,大小是固定的,就是 1073741824 ,这个就是1G
如果00000000000000000000文件存不下了,就会立即新建一个同样大小的文件,文件名直接采用新文件里面第一条消息的索引值作为文件名字.比如说可能是000000000000000010000 ,就表示这个文件是从第一万个消息开始存的
[root@zjj102 commitlog]# ll
总用量 466152
-rw-r--r--. 1 root root 1073741824 10月 25 21:26 00000000000000000000
[root@zjj102 commitlog]#
ConsumerQueue
ConsumerQueue:存储消息在CommitLog的索引。一个MessageQueue一个文件,记录当前
MessageQueue被哪些消费者组消费到了哪一条CommitLog,就是一个队列对消息消费的进度.
为了保证效率 ConsumerQueue是不存消息内存的,只是存CommitLog文件的消息索引.这样我通过ConsumerQueue就能快速的找到消息在CommitLog里面的哪个内容了.
ConsumerQueue文件也给tag做了个过滤,保存了tag的索引,如果我们使用tag来进行消息过滤,速度也很快的.所以阿里官方建议我们过滤消息的时候要用tag来过滤,因为tag过滤效率是非常高的,直接基于RocketMQ的底层ConsumerQueue文件进行过滤消息.
# 进入到consumequeue文件夹下,里面有多个文件夹,文件夹名就是topic
[root@zjj102 consumequeue]# ls
BatchTest RMQ_SYS_TRANS_HALF_TOPIC TagFilterTest
OFFSET_MOVED_EVENT RMQ_SYS_TRANS_OP_HALF_TOPIC TopicTest
OrderTopicTest SCHEDULE_TOPIC_XXXX TopicTest2
RMQ_SYS_TRACE_TOPIC SqlFilterTest TRANS_CHECK_MAX_TIME_TOPIC
# 进入到BatchTest这个topic文件夹下
[root@zjj102 consumequeue]# cd BatchTest/
# 里面是这个topic下的队列文件夹,每个文件夹就是队列名字
[root@zjj102 BatchTest]# ls
0 1 2 3
# 进入0号队列名字,里面就是这个0号队列的文件了,也是二进制文件,这个二进制文件大小是固定的,文件名就是文件的第一个内容的偏移量
[root@zjj102 BatchTest]# cd 0/
[root@zjj102 0]# ls
00000000000000000000 00000000000006000000 00000000000012000000
IndexFile
IndexFile:这个文件是在ConsumerQueue文件的基础上给我们提供了一些辅助的索引功能.比如说基于时间戳过滤,
为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程
IndexFile文件可以支持Key Hash搜索, TimeStamp搜索等等
//使用指定的消费者组名称实例化
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("defaultGroup");
consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
//从时间戳去读消息
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
// 通过setConsumeTimestamp方法指定时间点,然后通过这个时间点往后面去读
String consumeTimestamp = "起始时间戳";
consumer.setConsumeTimestamp(consumeTimestamp );
IndexFile文件命名和CommitLog文件类似,就是每个文件是固定的大小,只不过文件名是时间戳的格式
[root@zjj102 index]# ls
20211017125813343
这三个文件都是二进制文件
其它了解不重要的文件
abort
这个文件是RocketMQ用来判断程序是否正常关闭的一个标识文件。正常情况下,会在启动时创建,而关闭服务时删除。但是如果遇到一些服务器宕机,或者kill -9这样一些非正常关闭服务的情况,这个abort文件就不会删除,因此RocketMQ就可以判断上一次服务是非正常关闭的,后续就会做一些数据恢复的操作。
数据恢复操作比如说CommitLog把消息分发到ConsumerQueue文件IndexFile文件等等,由于你上次是 非正常关闭的,可能就出现任务分发到一半儿,RocketMQ进程就停了,此时启动RocketMQ之后,重新进行这些任务等等
checkpoint
数据存盘检查点
config/*.json
这些文件是将RocketMQ的一些关键配置信息进行存盘保存。例如Topic配置、消费者组配置、消费者组消息偏移量Offset 等等一些信息。是json格式存储的,程序员可以打开这个文件进行查看.
很多RocketMQ监控软件都是读取config/*.json文件来获取MQ队列信息的.