Ref: https://www.cnblogs.com/duanxz/p/5020398.html
一、RocketMQ 的消息存储基本介绍
先看一张图:
1、Commit log 存储消息实体。顺序写,随机读。
2、Message queue 存储消息的偏移量。读消息先读 message queue,根据偏移量到 commit log 读消息本身。
3、索引队列用来存储消息的索引 key.
使用 mmap 方式减少内存拷贝,提高读取性能。具体实现:FileChannel.map(RandomAccessFile)
CommitLog 以物理文件的方式存放,每台 Broker 上的 CommitLog 被本机器所有 ConsumeQueue 共享。
在 CommitLog,一个消息的存储长度是不固定的,RocketMQ 采用了一些机制,尽量向 CommitLog 中顺序写,但是随机读。
[磁盘存储的 “快”—— 顺序写 ]
磁盘存储,使用得当,磁盘的速度完全可以匹配上网络的数据传输速度,目前的高性能磁盘,顺序写速度可以达到 600MB/s,超过了一般网卡的传输速度。
[磁盘存储的 “慢”—— 随机写 ]
磁盘的随机写的速度只有 100KB/s,和顺序写的性能差了好几个数量级。
[存储机制这样设计的好处 —— 顺序写,随机读]
1. CommitLog 顺序写,可以大大提高写入的效率。
2. 虽然是随机读,但是利用 package 机制,可以批量地从磁盘读取,作为 cache 存到内存中,加速后续的读取速度。
3. 为了保证完全的顺序写,需要 ConsumeQueue 这个中间结构,因为 ConsumeQueue 里只存储偏移量信息,所以尺寸是有限的。在实际情况中,大部分 ConsumeQueue 能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。
[如何保证 CommitLog 和 ConsumeQueue 的一致性?]
CommitLog 里存储了 Consume Queues、Message Queue、Tag 等所有信息,即使 ConsumeQueue 丢失,也可以通过 commitLog 完全恢复出来。
RocketMQ 的 Broker 机器磁盘上的文件存储结构
1.1、RocketMQ 的消息存储主要有如下概念:
(1)CommitLog:消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容。单个文件大小默认 1G ,文件名长度为 20 位,左边补零,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;
(2) ConsumeQueue:消息消费的逻辑队列,作为消费消息的索引,保存了指定 Topic 下的队列消息在 CommitLog 中的起始物理偏移量 offset,消息大小 size 和消息 Tag 的 HashCode 值。 而 IndexFile(索引文件)则只是为了消息查询提供了一种通过 key 或时间区间来查询消息的方法(ps:这种通过 IndexFile 来查找消息的方法不影响发送与消费消息的主流程)。从实际物理存储来说,ConsumeQueue 对应每个 Topic 和 QueuId 下面的文件。单个文件大小约 5.72M,每个文件由 30W 条数据组成,每个文件默认大小为 600 万个字节,当一个 ConsumeQueue 类型的文件写满了,则写入下一个文件;
(3)IndexFile:因为所有的消息都存在 CommitLog 中,如果要实现根据 key 查询 消息的方法,就会变得非常困难,所以为了解决这种业务需求,有了 IndexFile 的存在。用于为生成的索引文件提供访问服务,通过消息 Key 值查询消息真正的实体内容。在实际的物理存储上,文件名则是以创建时的时间戳命名的,固定的单个 IndexFile 文件大小约为 400M,一个 IndexFile 可以保存 2000W 个索引;
(4)MapedFileQueue:对连续物理存储的抽象封装类,源码中可以通过消息存储的物理偏移量位置快速定位该 offset 所在 MappedFile (具体物理存储位置的抽象)、创建、删除 MappedFile 等操作;
(5)MappedFile:文件存储的直接内存映射业务抽象封装类,源码中通过操作该类,可以把消息字节写入 PageCache 缓存区(commit),或者原子性地将消息持久化的刷盘(flush);
在 RocketMQ 中消息刷盘主要可以分为同步刷盘和异步刷盘两种:
1.2、RocketMQ 消息刷盘的主要过程
CommitLog 写入:
MapedFileQueue 存储队列,数据定时删除,无限增长。
队列有多个文件(MapedFile)组成
当消息到达 broker 时,需要获取最新的 MapedFile 写入数据,调用 MapedFileQueue 的 getLastMapedFile 获取,此函数如果集合中一个也没有创建一个,如果最后一个写满了也创建一个新的。
MapedFileQueue 在获取 getLastMapedFile 时,如果需要创建新的 MapedFile 会计算出下一个 MapedFile 文件地址,通过预分配服务 AllocateMapedFileService 异步预创建下一个 MapedFile 文件,这样下次创建新文件请求就不要等待,因为创建文件特别是一个 1G 的文件还是有点耗时的,
后续如果是异步刷盘还需要将 mapedFile 中的消息序列化到 commitLog 物理文件
consumeQueue 写入:
也采用 mappedFile 文件内存映射。
底层也用与 commitLog 相同的 MapedFileQueue 数据结构。
consume queue 中存储单元是一个 20 字节定长的数据,是顺序写顺序读
(1)commitLogOffset 是指这条消息在 commitLog 文件实际偏移量
(2)size 就是指消息大小
(3)消息 tag 的哈希值
(1)同步刷盘:如上图所示,只有在消息真正持久化至磁盘后,RocketMQ 的 Broker 端才会真正地返回给 Producer 端一个成功的 ACK 响应。同步刷盘对 MQ 消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用领域。RocketMQ 同步刷盘的大致做法是,基于生产者消费者模型,主线程创建刷盘请求实例 —GroupCommitRequest 并在放入刷盘写队列后唤醒同步刷盘线程 —GroupCommitService,来执行刷盘动作(其中用了 CAS 变量和 CountDownLatch 来保证线程间的同步)。这里,RocketMQ 源码中用读写双缓存队列(requestsWrite/requestsRead)来实现读写分离,其带来的好处在于内部消费生成的同步刷盘请求可以不用加锁,提高并发度。
(2)异步刷盘:能够充分利用 OS 的 PageCache 的优势,只要消息写入 PageCache 即可将成功的 ACK 返回给 Producer 端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了 MQ 的性能和吞吐量。异步和同步刷盘的区别在于,异步刷盘时,主线程并不会阻塞,在将刷盘线程 wakeup 后,就会继续执行。
1.3、几个主要的组件说明
1.3.1、ConsumeQueue
consumeQueue 是消息的逻辑队列,相当于字典的目录,用来指定消息在物理文件 commitLog 上的位置。其中包含了这个 MessageQueue 在 CommitLog 中的起始物理位置偏移量 offset,消息实体内容的大小和 Message Tag 的哈希值。从实际物理存储来说,ConsumeQueue 对应每个 Topic 和 QueuId 下面的文件。单个文件大小约 5.72M,每个文件由 30W 条数据组成,每个文件默认大小为 600 万个字节,当一个 ConsumeQueue 类型的文件写满了,则写入下一个文件;
我们可以在配置中指定 consumequeue 与 commitlog 存储的目录
每个 topic 下的每个 queue 都有一个对应的 consumequeue 文件,比如:
${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}
Consume Queue 文件组织,如图所示:
Consume Queue 文件组织示意图
- 根据 topic 和 queueId 来组织文件,图中 TopicA 有两个队列 0,1,那么 TopicA 和 QueueId=0 组成一个 ConsumeQueue,TopicA 和 QueueId=1 组成另一个 ConsumeQueue。
- 按照消费端的 GroupName 来分组重试队列,如果消费端消费失败,消息将被发往重试队列中,比如图中的 %RETRY%ConsumerGroupA。
- 按照消费端的 GroupName 来分组死信队列,如果消费端消费失败,并重试指定次数后,仍然失败,则发往死信队列,比如图中的 %DLQ%ConsumerGroupA。
死信队列(Dead Letter Queue)一般用于存放由于某种原因无法传递的消息,比如处理失败或者已经过期的消息。
Consume Queue 中存储单元是一个 20 字节定长的二进制数据,顺序写顺序读,如下图所示:
Queue 单个存储单元结构
consume queue 文件存储单元格式
- CommitLog Offset 是指这条消息在 Commit Log 文件中的实际偏移量
- Size 存储中消息的大小
Message Tag HashCode 存储消息的 Tag 的哈希值:主要用于订阅时消息过滤(订阅时如果指定了 Tag,会根据 HashCode 来快速查找到订阅的消息)
1.3.2、Commit Log
CommitLog:消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容。消息存放的物理文件,每台 broker 上的 commitlog 被本机所有的 queue 共享,不做任何区分。
文件的默认位置如下,仍然可通过配置文件修改:${user.home} \store\${commitlog}\${fileName}
CommitLog 的消息存储单元长度不固定,文件顺序写,随机读。消息的存储结构如下表所示,按照编号顺序以及编号对应的内容依次存储。
1.3.3、IndexFile 消息的索引文件
IndexFile:用于为生成的索引文件提供访问服务,通过消息 Key 值查询消息真正的实体内容。在实际的物理存储上,文件名则是以创建时的时间戳命名的,固定的单个 IndexFile 文件大小约为 400M,一个 IndexFile 可以保存 2000W 个索引;
indexFile 存放的位置:${rocketmq.home}/store/index/indexFile(年月日时分秒等组成文件名)
Index 生成过程
上一篇 (https://www.jianshu.com/p/606d4b77d504) 讲 ConsumeQueue 的时候,有一个 ReputMessageService 在分发消息的时候还会调用 CommitLogDispatcherBuildIndex 用来创建 index。这个类实现就是直接调用的 IndexService.buildIndex()
如果一个消息包含 key 值的话,会使用 IndexFile 存储消息索引,文件的内容结构如图:
消息索引
索引文件主要用于根据 key 来查询消息的,流程主要是:- 根据查询的 key 的 hashcode% slotNum 得到具体的槽的位置 (slotNum 是一个索引文件里面包含的最大槽的数目,例如图中所示 slotNum=5000000)
- 根据 slotValue (slot 位置对应的值) 查找到索引项列表的最后一项 (倒序排列,slotValue 总是指向最新的一个索引项)
- 遍历索引项列表返回查询时间范围内的结果集 (默认一次最大返回的 32 条记录)
二、RocketMQ 的消息存储原理
消息存储是 MQ 消息队列中最为复杂和最为重要的一部分,本文先从目前几种比较常用的 MQ 消息队列存储方式出发,为大家介绍 RocketMQ 选择磁盘文件存储的原因。然后,本文分别从 RocketMQ 的消息存储整体架构和 RocketMQ 文件存储模型层次结构两方面进行深入分析介绍。使得大家读完本文后对 RocketMQ 消息存储部分有一个大致的了解和认识。
2.1、MQ 消息队列的一般存储方式
当前业界几款主流的 MQ 消息队列采用的存储方式主要有以下三种方式:
(1)分布式 KV 存储:这类 MQ 一般会采用诸如 levelDB、RocksDB 和 Redis 来作为消息持久化的方式,由于分布式缓存的读写能力要优于 DB,所以在对消息的读写能力要求都不是比较高的情况下,采用这种方式倒也不失为一种可以替代的设计方案。消息存储于分布式 KV 需要解决的问题在于如何保证 MQ 整体的可靠性?
(2)文件系统:目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机 / 物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。小编认为,消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署 MQ 机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。
(3)关系型数据库 DB:Apache 下开源的另外一款 MQ—ActiveMQ(默认采用的 KahaDB 做消息存储)可选用 JDBC 的方式来做消息持久化,通过简单的 xml 配置信息即可实现 JDBC 消息存储。由于,普通关系型数据库(如 Mysql)在单表数据量达到千万级别的情况下,其 IO 读写性能往往会出现瓶颈。因此,如果要选型或者自研一款性能强劲、吞吐量大、消息堆积能力突出的 MQ 消息队列,那么小编并不推荐采用关系型数据库作为消息持久化的方案。在可靠性方面,该种方案非常依赖 DB,如果一旦 DB 出现故障,则 MQ 的消息就无法落盘存储会导致线上故障;
因此,综合上所述从存储效率来说, 文件系统 > 分布式 KV 存储 > 关系型数据库 DB,直接操作文件系统肯定是最快和最高效的,而关系型数据库 TPS 一般相比于分布式 KV 系统会更低一些(简略地说,关系型数据库本身也是一个需要读写文件 server,这时 MQ 作为 client 与其建立连接并发送待持久化的消息数据,同时又需要依赖 DB 的事务等,这一系列操作都比较消耗性能),所以如果追求高效的 IO 读写,那么选择操作文件系统会更加合适一些。但是如果从易于实现和快速集成来看, 关系型数据库 DB > 分布式 KV 存储 > 文件系统,但是性能会下降很多。
另外,从消息中间件的本身定义来考虑,应该尽量减少对于外部第三方中间件的依赖。一般来说依赖的外部系统越多,也会使得本身的设计越复杂,所以小编个人的理解是采用文件系统作为消息存储的方式,更贴近消息中间件本身的定义。
2.2、RocketMQ 消息存储整体架构
消息存储实现,比较复杂,也值得大家深入了解,后面会单独成文来分析,这小节只以代码说明一下具体的流程。
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
synchronized (this) {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
// Here settings are stored timestamp, in order to ensure an orderly global
msg.setStoreTimestamp(beginLockTimestamp);
// MapedFile:操作物理文件在内存中的映射以及将内存数据持久化到物理文件中
MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();
// 将Message追加到文件commitlog
result = mapedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:break;
case END_OF_FILE:
// Create a new file, re-write the message
mapedFile = this.mapedFileQueue.getLastMapedFile();
result = mapedFile.appendMessage(msg, this.appendMessageCallback);
break;
DispatchRequest dispatchRequest = new DispatchRequest(
topic,// 1
queueId,// 2
result.getWroteOffset(),// 3
result.getWroteBytes(),// 4
tagsCode,// 5
msg.getStoreTimestamp(),// 6
result.getLogicsOffset(),// 7
msg.getKeys(),// 8
/**
* Transaction
*/
msg.getSysFlag(),// 9
msg.getPreparedTransactionOffset());// 10
// 1.分发消息位置到ConsumeQueue
// 2.分发到IndexService建立索引
this.defaultMessageStore.putDispatchRequest(dispatchRequest);
}
(1)RocketMQ 消息存储结构类型及缺点
上图即为 RocketMQ 的消息存储整体架构,RocketMQ 采用的是混合型的存储结构,即为 Broker 单个实例下所有的队列共用一个日志数据文件(即为 CommitLog)来存储。而 Kafka 采用的是独立型的存储结构,每个队列一个文件。这里小编认为,RocketMQ 采用混合型存储结构的缺点在于,会存在较多的随机读操作,因此读的效率偏低。同时消费消息需要依赖 ConsumeQueue,构建该逻辑消费队列需要一定开销。
上面图中假设 Consumer 端默认设置的是同一个 ConsumerGroup,因此 Consumer 端线程采用的是负载订阅的方式进行消费。从架构图中可以总结出如下几个关键点:
(1)消息生产与消息消费相互分离,Producer 端发送消息最终写入的是 CommitLog(消息存储的日志数据文件),Consumer 端先从 ConsumeQueue(消息逻辑队列)读取持久化消息的起始物理位置偏移量 offset、大小 size 和消息 Tag 的 HashCode 值,随后再从 CommitLog 中进行读取待拉取消费消息的真正实体内容部分;
(2) RocketMQ 的 CommitLog 文件采用混合型存储(所有的 Topic 下的消息队列共用同一个 CommitLog 的日志数据文件),并通过建立类似索引文件 —ConsumeQueue 的方式来区分不同 Topic 下面的不同 MessageQueue 的消息,同时为消费消息起到一定的缓冲作用(只有 ReputMessageService 异步服务线程通过 doDispatch 异步生成了 ConsumeQueue 队列的元素后,Consumer 端才能进行消费)。这样,只要消息写入并刷盘至 CommitLog 文件后,消息就不会丢失,即使 ConsumeQueue 中的数据丢失,也可以通过 CommitLog 来恢复。
(3) RocketMQ 每次读写文件的时候真的是完全顺序读写么?这里,发送消息时,生产者端的消息确实是顺序写入 CommitLog;订阅消息时,消费者端也是 顺序读取 ConsumeQueue,然而根据其中的起始物理位置偏移量 offset 读取消息真实内容却是 随机读取 CommitLog。 在 RocketMQ 集群整体的吞吐量、并发量非常高的情况下,随机读取文件带来的性能开销影响还是比较大的,那么这里如何去优化和避免这个问题呢?后面的章节将会逐步来解答这个问题。
这里,同样也可以总结下 RocketMQ 存储架构的优缺点:
(1) 优点:
a、ConsumeQueue 消息逻辑队列较为轻量级;
b、对磁盘的访问串行化,避免磁盘竟争,不会因为队列增加导致 IOWAIT 增高;
(2)缺点:
a、对于 CommitLog 来说写入消息虽然是顺序写,但是读却变成了完全的随机读;
b、Consumer 端订阅消费一条消息,需要先读 ConsumeQueue,再读 Commit Log,一定程度上增加了开销;
RocketMQ 文件存储模型层次结构如上图所示,根据类别和作用从概念模型上大致可以划分为 5 层,下面将从各个层次分别进行分析和阐述:
(1)RocketMQ 业务处理器层:Broker 端对消息进行读取和写入的业务逻辑入口,这一层主要包含了业务逻辑相关处理操作(根据解析 RemotingCommand 中的 RequestCode 来区分具体的业务操作类型,进而执行不同的业务处理流程),比如前置的检查和校验步骤、构造 MessageExtBrokerInner 对象、decode 反序列化、构造 Response 返回对象等;
(2) RocketMQ 数据存储组件层;该层主要是 RocketMQ 的存储核心类 —DefaultMessageStore,其为 RocketMQ 消息数据文件的访问入口,通过该类的 “putMessage ()” 和 “getMessage ()” 方法完成对 CommitLog 消息存储的日志数据文件进行读写操作(具体的读写访问操作还是依赖下一层中 CommitLog 对象模型提供的方法);另外,在该组件初始化时候,还会启动很多存储相关的后台服务线程,包括 AllocateMappedFileService(MappedFile 预分配服务线程)、ReputMessageService(回放存储消息服务线程)、HAService(Broker 主从同步高可用服务线程)、StoreStatsService(消息存储统计服务线程)、IndexService(索引文件服务线程)等;
(3) RocketMQ 存储逻辑对象层:该层主要包含了 RocketMQ 数据文件存储直接相关的三个模型类 IndexFile、ConsumerQueue 和 CommitLog。IndexFile 为索引数据文件提供访问服务,ConsumerQueue 为逻辑消息队列提供访问服务,CommitLog 则为消息存储的日志数据文件提供访问服务。这三个模型类也是构成了 RocketMQ 存储层的整体结构(对于这三个模型类的深入分析将放在后续篇幅中);
(4)封装的文件内存映射层:RocketMQ 主要采用 JDK NIO 中的 MappedByteBuffer 和 FileChannel 两种方式完成数据文件的读写。其中,采用 MappedByteBuffer 这种内存映射磁盘文件的方式完成对大文件的读写,在 RocketMQ 中将该类封装成 MappedFile 类。这里限制的问题在上面已经讲过;对于每类大文件(IndexFile/ConsumerQueue/CommitLog),在存储时分隔成多个固定大小的文件(单个 IndexFile 文件大小约为 400M、单个 ConsumerQueue 文件大小约 5.72M、单个 CommitLog 文件大小为 1G),其中每个分隔文件的文件名为前面所有文件的字节大小数 + 1,即为文件的起始偏移量,从而实现了整个大文件的串联。这里,每一种类的单个文件均由 MappedFile 类提供读写操作服务(其中,MappedFile 类提供了顺序写 / 随机读、内存数据刷盘、内存清理等和文件相关的服务);
(5) 磁盘存储层:主要指的是部署 RocketMQ 服务器所用的磁盘。这里,需要考虑不同磁盘类型(如 SSD 或者普通的 HDD)特性以及磁盘的性能参数(如 IOPS、吞吐量和访问时延等指标)对顺序写 / 随机读操作带来的影响(ps:小编建议在正式业务上线之前做好多轮的性能压测,具体用压测的结果来评测);
(2)RocketMQ 消息存储架构深入分析
从上面的整体架构图中可见,RocketMQ 的混合型存储结构针对 Producer 和 Consumer 分别采用了数据和索引部分相分离的存储结构,
Producer 端:Producer 发送消息至 Broker 端,然后 Broker 端使用同步或者异步的方式对消息刷盘持久化,保存至 CommitLog 中。只要消息被刷盘持久化至磁盘文件 CommitLog 中,那么 Producer 发送的消息就不会丢失。
Consumer 端:Consumer 也就肯定有机会去消费这条消息,至于消费的时间可以稍微滞后一些也没有太大的关系。退一步地讲,即使 Consumer 端第一次没法拉取到待消费的消息,Broker 服务端也能够通过长轮询机制等待一定时间延迟后再次发起拉取消息的请求。这里,RocketMQ 的具体做法是,使用 Broker 端的后台服务线程 —ReputMessageService 不停地分发请求并异步构建 ConsumeQueue(逻辑消费队列)和 IndexFile(索引文件)数据(ps:对于该服务线程在消息消费篇幅也有过介绍,不清楚的童鞋可以跳至消息消费篇幅再理解下)。然后,Consumer 即可根据 ConsumerQueue 来查找待消费的消息了。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定 Topic 下的队列消息在 CommitLog 中的起始物理偏移量 offset,消息大小 size 和消息 Tag 的 HashCode 值。而 IndexFile(索引文件)则只是为了消息查询提供了一种通过 key 或时间区间来查询消息的方法(ps:这种通过 IndexFile 来查找消息的方法不影响发送与消费消息的主流程)。
(3)PageCache 与 Mmap 内存映射
这里有必要先稍微简单地介绍下 page cache 的概念。系统的所有文件 I/O 请求,操作系统都是通过 page cache 机制实现的。对于操作系统来说,磁盘文件都是由一系列的数据块顺序组成,数据块的大小由操作系统本身而决定,x86 的 linux 中一个标准页面大小是 4KB。
操作系统内核在处理文件 I/O 请求时,首先到 page cache 中查找(page cache 中的每一个数据块都设置了文件以及偏移量地址信息),如果未命中,则启动磁盘 I/O,将磁盘文件中的数据块加载到 page cache 中的一个空闲块,然后再 copy 到用户缓冲区中。
page cache 本身也会对数据文件进行预读取,对于每个文件的第一个读请求操作,系统在读入所请求页面的同时会读入紧随其后的少数几个页面。因此,想要提高 page cache 的命中率(尽量让访问的页在物理内存中),从硬件的角度来说肯定是物理内存越大越好。从操作系统层面来说,访问 page cache 时,即使只访问 1k 的消息,系统也会提前预读取更多的数据,在下次读取消息时,就很可能可以命中内存。
在 RocketMQ 中,ConsumeQueue 逻辑消费队列存储的数据较少,并且是顺序读取,在 page cache 机制的预读取作用下,Consume Queue 的读性能会比较高近乎内存,即使在有消息堆积情况下也不会影响性能。而对于 CommitLog 消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统 IO 调度算法,比如设置调度算法为 “Noop”(此时块存储采用 SSD 的话),随机读的性能也会有所提升。
Mmap 内存映射技术 —MappedByteBuffer
(a)Mmap 内存映射技术的特点
Mmap 内存映射和普通标准 IO 操作的本质区别在于它并不需要将文件中的数据先拷贝至 OS 的内核 IO 缓冲区,而是可以直接将用户进程私有地址空间中的一块区域与文件对象建立映射关系,这样程序就好像可以直接从内存中完成对文件读 / 写操作一样。只有当缺页中断发生时,直接将文件从磁盘拷贝至用户态的进程空间内,只进行了一次数据拷贝。对于容量较大的文件来说(文件大小一般需要限制在 1.5~2G 以下),采用 Mmap 的方式其读 / 写的效率和性能都非常高。
(b)JDK NIO 的 MappedByteBuffer 简要分析
从 JDK 的源码来看,MappedByteBuffer 继承自 ByteBuffer,其内部维护了一个逻辑地址变量 —address。在建立映射关系时,MappedByteBuffer 利用了 JDK NIO 的 FileChannel 类提供的 map () 方法把文件对象映射到虚拟内存。仔细看源码中 map () 方法的实现,可以发现最终其通过调用 native 方法 map0 () 完成文件对象的映射工作,同时使用 Util.newMappedByteBuffer () 方法初始化 MappedByteBuffer 实例,但最终返回的是 DirectByteBuffer 的实例。在 Java 程序中使用 MappedByteBuffer 的 get () 方法来获取内存数据是最终通过 DirectByteBuffer.get () 方法实现(底层通过 unsafe.getByte () 方法,以 “地址 + 偏移量” 的方式获取指定映射至内存中的数据)。
(c)使用 Mmap 的限制
a.Mmap 映射的内存空间释放的问题;由于映射的内存空间本身就不属于 JVM 的堆内存区(Java Heap),因此其不受 JVM GC 的控制,卸载这部分内存空间需要通过系统调用 unmap () 方法来实现。然而 unmap () 方法是 FileChannelImpl 类里实现的私有方法,无法直接显示调用。 RocketMQ 中的做法是,通过 Java 反射的方式调用 “sun.misc” 包下的 Cleaner 类的 clean () 方法来释放映射占用的内存空间;
b.MappedByteBuffer 内存映射大小限制;因为其占用的是虚拟内存(非 JVM 的堆内存),大小不受 JVM 的 - Xmx 参数限制,但其大小也受到 OS 虚拟内存大小的限制。一般来说,一次只能映射 1.5~2G 的文件至用户态的虚拟内存空间,这也是为何 RocketMQ 默认设置单个 CommitLog 日志数据文件为 1G 的原因了;
c. 使用 MappedByteBuffe 的其他问题;会存在内存占用率较高和文件关闭不确定性的问题;
OS 的 PageCache 机制
PageCache 是 OS 对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写访问,这里的主要原因就是在于 OS 使用 PageCache 机制对读写访问操作进行了性能优化,将一部分的内存用作 PageCache。
(1)对于数据文件的读取,如果一次读取文件时出现未命中 PageCache 的情况,OS 从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取(ps:顺序读入紧随其后的少数几个页面)。这样,只要下次访问的文件已经被加载至 PageCache 时,读取操作的速度基本等于访问内存。
(2)对于数据文件的写入,OS 会先写入至 Cache 内,随后通过异步的方式由 pdflush 内核线程将 Cache 内的数据刷盘至物理磁盘上。
对于文件的顺序读写操作来说,读和写的区域都在 OS 的 PageCache 内,此时读写性能接近于内存。 RocketMQ 的大致做法是,将数据文件映射到 OS 的虚拟内存中(通过 JDK NIO 的 MappedByteBuffer),写消息的时候首先写入 PageCache,并通过异步刷盘的方式将消息批量的做持久化(同时也支持同步刷盘);订阅消费消息时(对 CommitLog 操作是随机读取),由于 PageCache 的局部性热点原理且整体情况下还是从旧到新的有序读,因此大部分情况下消息还是可以直接从 Page Cache 中读取,不会产生太多的缺页(Page Fault)中断而从磁盘读取。
PageCache 机制也不是完全无缺点的,当遇到 OS 进行脏页回写,内存回收,内存 swap 等情况时,就会引起较大的消息读写延迟。
对于这些情况,RocketMQ 采用了多种优化技术,比如内存预分配,文件预热,mlock 系统调用等,来保证在最大可能地发挥 PageCache 机制优点的同时,尽可能地减少其缺点带来的消息读写延迟。
另外,RocketMQ 主要通过 MappedByteBuffer 对文件进行读写操作。其中,利用了 NIO 中的 FileChannel 模型直接将磁盘上的物理文件直接映射到用户态的内存地址中(这种 Mmap 的方式减少了传统 IO 将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(这里需要注意的是,采用 MappedByteBuffer 这种内存映射的方式有几个限制,其中之一是一次只能映射 1.5~2G 的文件至用户态的虚拟内存,这也是为何 RocketMQ 默认设置单个 CommitLog 日志数据文件为 1G 的原因了)。
2.3、RocketMQ 文件存储模型层次结构
三、RocketMQ 存储优化技术
这一节将主要介绍 RocketMQ 存储层采用的几项优化技术方案在一定程度上可以减少 PageCache 的缺点带来的影响,主要包括内存预分配,文件预热和 mlock 系统调用。
3.1 预先分配 MappedFile
在消息写入过程中(调用 CommitLog 的 putMessage () 方法),CommitLog 会先从 MappedFileQueue 队列中获取一个 MappedFile,如果没有就新建一个。
这里,MappedFile 的创建过程是将构建好的一个 AllocateRequest 请求(具体做法是,将下一个文件的路径、下下个文件的路径、文件大小为参数封装为 AllocateRequest 对象)添加至队列中,后台运行的 AllocateMappedFileService 服务线程(在 Broker 启动时,该线程就会创建并运行),会不停地 run,只要请求队列里存在请求,就会去执行 MappedFile 映射文件的创建和预分配工作,分配的时候有两种策略,一种是使用 Mmap 的方式来构建 MappedFile 实例,另外一种是从 TransientStorePool 堆外内存池中获取相应的 DirectByteBuffer 来构建 MappedFile(ps:具体采用哪种策略,也与刷盘的方式有关)。并且,在创建分配完下个 MappedFile 后,还会将下下个 MappedFile 预先创建并保存至请求队列中等待下次获取时直接返回。RocketMQ 中预分配 MappedFile 的设计非常巧妙,下次获取时候直接返回就可以不用等待 MappedFile 创建分配所产生的时间延迟。
3.2 文件预热 &&mlock 系统调用
(1)mlock 系统调用:其可以将进程使用的部分或者全部的地址空间锁定在物理内存中,防止其被交换到 swap 空间。对于 RocketMQ 这种的高吞吐量的分布式消息队列来说,追求的是消息读写低延迟,那么肯定希望尽可能地多使用物理内存,提高数据读写访问的操作效率。
(2)文件预热:预热的目的主要有两点;第一点,由于仅分配内存并进行 mlock 系统调用后并不会为程序完全锁定这些内存,因为其中的分页可能是写时复制的。因此,就有必要对每个内存页面中写入一个假的值。其中,RocketMQ 是在创建并分配 MappedFile 的过程中,预先写入一些随机值至 Mmap 映射出的内存空间里。第二,调用 Mmap 进行内存映射后,OS 只是建立虚拟内存地址至物理地址的映射表,而实际并没有加载任何文件至内存中。程序要访问数据时 OS 会检查该部分的分页是否已经在内存中,如果不在,则发出一次缺页中断。这里,可以想象下 1G 的 CommitLog 需要发生多少次缺页中断,才能使得对应的数据才能完全加载至物理内存中(ps:X86 的 Linux 中一个标准页面大小是 4KB)? RocketMQ 的做法是,在做 Mmap 内存映射的同时进行 madvise 系统调用,目的是使 OS 做一次内存映射后对应的文件数据尽可能多的预加载至内存中,从而达到内存预热的效果。