引言
功能介绍
简单来说,消息队列就是基础数据结构课程里 “先进先出” 的一种数据结构,但是如果要消除单点故障,保证消息传输的可靠性,并且还能应对大流量的冲击,对消息队列的要求就很高了。现在互联网 “微架构” 模式兴起,原有大型集中式的 IT 服务因为各种弊端,通常被分拆成细粒度的多个“微服务”,这些微服务可以在一个局域网内,也可能跨机房部署。一方面对服务之间松耦合的要求越来越高,另一方面,服务之间的联系却越来越紧密,对通信质量的要求也越来越高。分布式消息队列可以提供应用解耦、流量削峰、消息分发等功能,已经成为大型互联网服务架构里标配的中间件。
应用解耦
复杂的应用里会存在多个子系统,比如在电商应用中有订单系统、库存系统、物流系统、支付系统等。这个时候如果各个子系统之间的耦合性太高,整体系统的可用性就会大幅降低。多个低错误率的子系统强耦合在一起,得到的是一个高错误率的整体系统。
以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。
如下图所示,当转变成基于消息队列的方式后,系统可用性就高多了,比如物流系统因为发生故障,需要几分钟的时间来修复,在这几分钟的时间里,物流系统要处理的内容被缓存在消息队列里,用户的下单操作可以正常完成。当物流系统恢复后,补充处理存储在消息队列里的订单信息即可,终端用户感知不到物流系统发生过几分钟的故障。
流量削峰
电商服务中一般都会有大促活动,在大促时,大部分应用系统流量会在瞬间猛增,这个时候如果没有缓冲机制,不可能承受住短时大流量的冲击。通过利用消息队列,把大量的请求暂存起来,分散到相对长的一段时间内处理,能大大提高系统的稳定性和用户体验。
举个例子,如果订单系统每秒最多能处理一万次下单,这个处理能力应对正常时段的下单是绰绰有余的,正常时段我们下单后一秒内就能返回结果。在大促活动时,如果没有消息队列这种缓冲机制,为了保证系统稳定,只能在订单超过一万次后就不允许用户下单了。如果有消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单后十几秒才能收到下单成功的状态,但是也比不能下单的体验要好。
使用消息队列进行流量削峰,很多时候不是因为能力不够,而是出于经济性的考量。
消息分发
在大数据时代,数据对很多公司来说就像金矿,公司需要依赖对数据的分析,进行用户画像、精准推送、流程优化等各种操作,并且对处理的实时性要求越来越高。数据是不断产生的,各个分析团队、算法团队都要依赖这些数据来进行工作,这个时候有个可持久化的消息队列就非常重要。数据的产生方只需要把各自的数据写入一个消息队列即可,数据使用方根据各自需求订阅感兴趣的数据,不同数据团队所订阅的数据可以重复也可以不重复,互不干扰,也不必和数据产生方关联。
如下图所示,各个子系统将日志数据不停地写入消息队列,不同的数据处理系统有各自的 Offset,互不影响。甚至某个团队处理完的结果数据也可以写入消息队列,作为数据的产生方,供其他团队使用,避免重复计算。在大数据时代,消息队列已经成为数据处理系统不可或缺的一部分。
方便动态扩容
我们已经知道,消息队列可以帮我们缓存用户的请求,让我们有更加宽裕的时间来处理这些请求,那么对于请求越积压越多的情况,显然只通过现存下游服务持续消费是无法满足的。这时候,就需要根据消息队列中数据积压的情况动态的增加下游服务的节点数,避免消息越积压越多,最后到无法控制的地步。
保证最终一致性
既然前面已经引入了 “微服务” 的概念,必然就会牵扯出分布式事务的问题,而业内解决分布式事务问题基本是采用如下两套方案: - 基于 TCC 的事务框架 - 消息队列
如果服务双方是同步调用,即要么一起成功,要么一起失败,则此时应该选用 TCC 的事务框架,这部分内容我们以后会写一篇文章介绍分布式事务框架————seata,那时再专门进行说明。
如果服务双方是异步调用,即上游服务落库后立即返回,不等待下游服务的执行结果时,一般都会采用消息队列来实现。下面的例子虽然不是 RocketMQ 的方案,但是是使用消息队列解决最终一致性问题的一个通用方案,我们不妨先来看一看。
图中的过程表述的是将支付宝中的钱转入余额宝,操作步骤如下:
1. 支付宝将 zhansan 的余额扣除 100 并将分布式事务消息记录写入本地库中
- 借助了本地数据库的 ACID 属性,保证余额落库和消息记录落库的 ACID
2. 后台有一个定时程序,将本地库中未成功发送到消息队列的消息进行发送,消息中既包含余额宝要处理的账户的金额,也包含分布式事务的 id
- 引入本地消息记录表的意义:如果步骤 1 中直接余额落库,并发送消息,无法保证其原子性。可能消息发送成功了,但是最终本地事务提交失败。如果先提交本地事务,再发送消息,也可能本地提交成功,但是消息未发送。所以在步骤 1 中,将余额和消息记录同时落库,最后让定时任务去扫描未发送的消息,并进行消息发送,这时候发送成功后再将消息记录标记为已发送
- 这样虽然保证了,消息一定会被发送到消息队列,但是没办法保证消息只发送 1 份。因为可能消息发送成功后还没来得及修改本地消息记录的状态,就停机了。这时,重启服务器后,会产生重复的消息,这就需要下游服务提供幂等性支持(重复处理同一条消息,不会造成数据错误)
3. 余额宝从消息队列中拿到消息后,在同一个事务中将消息中存储的分布式事务 id 存储在本地的消息处理表中,然后修改 zhansan 的余额,最后提交事务
- 这样,如果收到一条重复的消息,在将消息插入到本地消息处理表时,就会发生事务 id 重复的错误,让事务回滚,从而保证了幂等性
至此,大家应该已经了解了通过消息队列来处理分布式事务的通用解决方案了,在这个例子中我们可以看出,消息的 Provider 需要在自己的服务中添加一个消息发送表,并维护一个循环任务来发送消息。这对其来说有很大的服务侵入性,在本文的后段,我会介绍 RocketMQ 的分布式事务方案,它通过自己的一些机制,降低了对消息 Provider 的侵入性。
设计理念与目标
设计理念
RocketMQ 设计基于主题的发布与订阅模式,其核心功能包括消息发送,消息存储(Broker),消息消费,整体设计追求简单与性能第一,主要体现在如下三个方面。
首先,NameServer 设计极其简单,摒弃了业界常用的使用 Zookeeper 充当信息管理的 “注册中心”,而是自研 NameServer 来实现元数据的管理 (Topic 路由信息等)。从实际需求出发,因为 Topic 路由信息无须在集群之间保持强一致,追求最终一致性,并且能容忍分钟级的不一致。正是基于此种情况,RocketMQ 的 NameServer 集群之间互不通信,极大地降低了 NameServer 实现的复杂程度,对网络的要求也降低了不少,但是性能相比较 Zookeeper 有了极大的提升。
其次是高效的 IO 存储机制。RocketMQ 追求消息发送的高吞吐量,RocketMQ 的消息存储文件设计成文件组的概念,组内单个文件大小固定,方便引人内存映射机制,所有主题的消息存储基于顺序写,极大地提供了消息写性能,同时为了兼顾消 息消费与消息查找,引入了消息消费队列文件与索引文件。
最后是容忍存在设计缺陷,适当将某些工作下放给 RocketMQ 使用者。消息中间件的实现者经常会遇到一个难题: 如何保证消息一定能被消息消费者消费,并且保证只消费一次。RocketMQ 的设计者给出的解决办法是不解决这个难题,而是退而求其次,只保证消息被消费者消费,但设计上允许消息被重复消费,这样极大地简化了消息中间件的内核,使得实现消息发送高可用变得非常简单与高效消息重复问题由消费者在消息消费时实现幂等。
设计目标
RocketMQ 作为一款消息中间件,需要解决如下问题。 架构模式 RocketMQ 与大部分消息中间件一样,采用发布订阅模式,基本的参与组件主要包括: 消息发送者、消息服务器 (消息存储)、消息消费、路由发现。 顺序消费 所谓顺序消息,就是消息消费者按照消息达到消息存储服务器的顺序消费。RocketMQ 可以严格保证消息有序,但是相较于无序队列来说,性能上会有很大的损失,不过这也是在所难免的。 消息过滤 消息过滤是指在消息消费时,消息消费者可以对同一主题下的消息按照规则只消费自己感兴趣的消息。RocketMQ 消息过滤支持在服务端与消费端的消息过滤机制。 - 消息在 Broker 端过滤。Broker 只将消息消费者感兴趣的消息发送给消息消费者。 - 消息在消息消费端过滤,消息过滤方式完全由消息消费者自定义,但缺点是有很多无用的消息会从 Broker 传输到消费端。
消息存储 消息中间件的一个核心实现是消息的存储对消息存储一般有如下两个维度的考量: 消息堆积能力和消息存储性能。RocketMQ 追求消息存储的高性能,引人内存映射机制,所有主题的消息顺序存储在同一个文件中。同时为了避免消息无限在消息存储服务器中累积,引入了消息文件过期机制与文件存储空间报警机制。 消息高可用性 通常影响消息可靠性的有以下几种情况。 1. Broker 正常关机。 2. Broker 异常 Crash。 3. OS Crash。 4. 机器断电,但是能立即恢复供电情况 。 5. 机器无法开机 (可能是 CPU、主板、内存等关键设备损坏)。 6. 磁盘设备损坏。
针对上述情况,情况 1~4 的 RocketMQ 在同步刷盘机制下可以确保不丢失消息,在异步刷盘模式下,会丢失少量消息。情况 5-6 属于单点故障,一旦发生,该节点上的消息全部丢失,如果开启了异步复制机制,RoketMQ 能保证只丢失少量消息,如果使用 Master Slave 双写机制,可以保证不丢失消息,从而满足消息可靠性要求极高的场合。 消费低延迟 RocketMQ 在消息不发生消息堆积时,以长轮询模式实现准实时的消息推送模式。 确保消息必须被消费一次 RocketMQ 通过消息消费确认机制 (ACK) 来确保消息至少被消费一次,但由于 ACK 消息有可能丢失等其他原因,RocketMQ 无法做到消息只被消费一次,有重复消费的可能。 回溯消息 回溯消息是指消息消费端已经消费成功的消息,由于业务要求需要重新消费消息。RocketMQ 支持按时间回溯消息,时间维度可精确到毫秒,可以向前或向后回溯。 消息堆积 消息中间件的主要功能是异步解耦,必须具备应对前端的数据洪峰,提高后端系统的可用性,必然要求消息中间件具备一定的消息堆积能力。RocketMQ 消息存储使用磁盘文件 (内存映射机制),并且在物理布局上为多个大小相等的文件组成逻辑文件组,可以无限循环使用。RocketMQ 消息存储文件并不是永久存储在消息服务器端,而是提供了过期机制,默认保留 3 天。 定时消息 定时消息是指消息发送到 Broker 后,不能被消息消费端立即消费,要到特定的时间点或者等待特定的时间后才能被消费。如果要支持任意精度的定时消息消费,必须在消息服务端对消息进行排序,势必带来很大的性能损耗,故 RocketMQ 不支持任意进度的定时消息,而只支持特定延迟级别。 消息重试机制 消息重试是指消息在消费时,如果发送异常,消息中间件需要支持消息重新投递,RocketMQ 支持消息重试机制。
架构
RocketMQ 集群中包含 4 个模块:Namesrv,Broker,Producer,Consumer。 - Namesrv: 存储当前集群所有 Brokers 信息、Topic 跟 Broker 的对应关系。 - Broker: 集群最核心模块,主要负责 Topic 消息存储、消费者的消费位点管理(消费进度)。 - Producer: 消息生产者,每个生产者都有一个 ID(编号),多个生产者实例可以共用同一个 ID。同一个 ID 下所有实例组成一个生产者集群。 - Consumer: 消息消费者,每个订阅者也有一个 ID(编号),多个消费者实例可以共用同一个 ID。同一个 ID 下所有实例组成一个消费者集群。
接下来,我们将按照 RocketMQ 中的模块,挨个介绍其实现方案。
NameServer
本节主要介绍 RocketMQ 路由管理、服务注册及服务发现的机制,NameServer 是整个 RocketMQ 的 “大脑”。相信大家对“服务发现” 这个词语并不陌生,分布式服务 SOA 架构体系中会有服务注册中心,分布式服务 SOA 的注册中心主要提供服务调用的解析服务,指引服务调用方 (消费者) 找到 “远方” 的服务提供者,完成网络通信,那么 RocketMQ 的路由中心存储的是什么数据呢? 作为一款高性能的消息中间件,如何避免 NameServer 的单点故障,提供高可用性呢?
Broker 消息服务器在启动时向所有 NameServer 注册,消息生产者 (Producer) 在发送消息之前先从 NameServer 获取 Broker 服务器地址列表,然后根据负载算法从列表中选择一台消息服务器进行消息发送。NameServer 与每台 Broker 服务器保持长连接,并间隔 30s 检测 Broker 是否存活,如果检测到 Broker 宕机,则从路由注册表中将其移除。但是路由变化不会马上通知消息生产者,为什么要这样设计呢? 这是为了降低 NameServer 实现的复杂性,在消息发送端提供容错机制来保证消息发送的高可用性。
NameServer 本身的高可用可通过部署多台 NameServer 服务器来实现,但彼此之间互不通信,也就是 NameServer 服务器之间在某一时刻的数据并不会完全相同,但这对消息发送不会造成任何影响,这也是 RocketMQ NameServer 设计的一个亮点,RocketMQ NameServer 设计追求简单高效。
存储内容
private final HashMap
private final HashMap
private final HashMap
private final HashMap
private final HashMap
- topicQueueTable:Topic 消息队列路由信息,消息发送时根据路由表进行负载均衡。
- brokerAddrTable:Broker 基础信息,包含 brokerName、所属集群名称、主备 Broker 地址。
- clusterAddrTable:Broker 集群信息,存储集群中所有 Broker 名称。
- brokerLiveTable:Broker 状态信息, NameServer 每次收到心跳包时会替换该信息。
- filterServerTable:Broker 上的 FilterServer 列表,用于类模式消息过滤。
QueueData、BrokerData、BrokerLiveInfo 类图如下图所示。
RocketMQ 2 主 2 从部署图如下所示。
对应运行时数据结构如下图所示。
路由注册
RocketMQ 路由注册是通过 Broker 与 NameServer 的心跳功能实现的。Broker 启动时向集群中所有的 NameServer 发送心跳语句,每隔 30s 向集群中所有 NameServer 发送心跳包,NameServer 收到 Broker 心跳包时会更新 brokerLiveTable 缓存中 BrokerLiveInfo 的 lastUpdateTimestamp,然后 NameServer 每隔 10s 扫描 brokerLiveTable,如果连续 120s 没有收到心跳包,NameServer 将移除该 Broker 的路由信息同时关闭 Socket 连接。
心跳包
- brokerAddr:broker 地址。
- brokerId:brokerId,O:Master;大于 0:Slave。
- brokerName:broker 名称。
- clusterName: 集群名称。
- haServerAddr:master 地址,初次请求时该值为空,slave 向 NameServer 注册后返回其 MasterAddr。
- requestBody:
- filterServerList:消息过滤服务器列表。
- topicConfigWrapper: 主题配置。
从心跳包内容我们会发现,每次心跳包中都会包含所有的 topic 信息,如果一个 broker 上 topic 非常多的话,心跳包就会比较大,如果正好赶上网络不好的时候,可能就会导致 broker 下线。
NameServer 与 Broker 保持长连接,Broker 状态存储在 brokerLiveTable 中,NameServer 每收到一个心跳包,将更新 brokerLiveTable 中关于 Broker 的状态信息以及路由表 (topicQueueTable、 brokerAddrTable、 brokerLiveTable、 filterServerTable)。
路由删除
Broker 每隔 30s 向 NameServer 发送一个心跳包,心跳包中包含 BrokerId、Broker 地址、Broker 名称、Broker 所属集群名称、Broker 关联的 FilterServer 列表。但是如果 Broker 宕机,NameServer 无法收到心跳包,此时 NameServer 如何来剔除这些失效的 Broker 呢? NameServer 会每隔 1Os 扫描 brokerLiveTable 状态表,如果 BrokerLive 的 lastUpdateTimestamp 的时间戳距当前时间超过 120s,则认为 Broker 失效,移除该 Broker, 关闭与 Broker 连接,并同时更新 topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。
RocketMQ 有两个触发点来触发路由删除: 1. NameServer 定时扫描 brokerLiveTable 检测上次心跳包与当前系统时间的时间差,如果时间戳大于 120s,则需要移除该 Broker 信息。 2. Broker 在正常被关闭的情况下,会执行 unRegisterBroker 指令, 主动删除 NameServer 中关于自己的信息。
路由发现
RocketMQ 路由发现是非实时的,当 Topic 路由出现变化后,NameServer 不主动推送给客户端,而是由客户端定时拉取主题最新的路由。
工作示意图
消息发送
RocketMQ 支持 3 种消息发送方式: 同步 (sync)、异步 (async)、单向 (oneway)。 - 同步: 发送者向 MQ 执行发送消息 API 时,同步等待,直到消息服务器返回发送结果。 - 异步: 发送者向 MQ 执行发送消息 API 时,指定消息发送成功后的回调函数,然后调用消息发送 API 后,立即返回,消息发送者线程不阻塞,直到运行结束,消息发送成功或失败的回调任务在一个新的线程中执行。 - 单向: 消息发送者向 MQ 执行发送消息 API 时,直接返回,不等待消息服务器的结果,也不注册回调函数,简单地说,就是只管发,不在乎消息是否成功存储在消息服务器上。
消息内容
private String topic;
private int flag;
private Map
private byte[] body;
private String transactionId;
Message 的基础属性主要包括消息所属主题 topic,消息 Flag,扩展属性,消息体,事务 ID。
消息 Flag 的定义如下,可以看出其主要和事务支持有关,关于 RocketMQ 的事务机制,我们后面会介绍:
public final static int COMPRESSED_FLAG = 0x1;
public final static int MULTI_TAGS_FLAG = 0x1 << 1;
public final static int TRANSACTION_NOT_TYPE = 0;
public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2;
public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2;
public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2;
Message 扩展属性主要包含下面几个。 - tag: 消息 TAG,用于消息过滤。 - keys:Message 索引键,多个用空格隔开,RocketMQ 可以根据这些 key 快速检索到消息。 - waitStoreMsgOK: 消息发送时是否等消息存储完成后再返回。 - delayTimeLevel: 消息延迟级别,用于定时消息或消息重试。
这些扩展属性存储在 Message 的 properties 中。
发送流程
消息发送流程主要的步骤: 验证消息、查找路由、消息发送 (包含异常处理机制)。
消息验证
消息发送之前,首先确保生产者处于运行状态,然后验证消息是否符合相应的规范,具体的规范要求是主题名称、消息体不能为空、消息长度不能等于 0 且默认不能超过允许发送消息的最大长度 4M(maxMessageSize=l024 _1024 _4)。
查找路由
消息发送之前,首先需要获取主题的路由信息,只有获取了这些信息我们才知道消息要发送到具体的 Broker 节点。
如果生产者中缓存了 topic 的路由信息,如果该路由信息中包含了消息队列,则直接返回该路由信息,如果没有缓存或没有包含消息队列,则向 NameServer 查询该 topic 的路由信息。如果最终未找到路由信息,则抛出异常: 无法找到主题相关路由信息异常。
这里就有一个问题,如果整个消息队列服务刚运行,各个 topic 的路由信息是如何创建出来的?一般来说会有两个方案: 1. 在生产者发送消息之前,就人工创建好各个 topic 的路由信息,这样做的好处是,可以根据该 topic 消息的实际需求,分配合适的 broker 数量和消息队列数量。一般来说,生产环境的服务都推荐以这种方式进行。 2. 可以配置各个 Broker,打开其自动创建 topic 的功能(BrokerConfig#autoCreateTopicEnable),这样就会在发送第一个消息时,动态的创建该 topic 的路由信息。
自动创建 topic 路由的过程如下:
1. Broker 如果开启了自动创建 topic 功能,则创建默认主题路由信息,并通过心跳包告知 NameServer
2. Producer 查询本地路由缓存,未找到新 topic 的路由信息
3. Producer 查询 NameServer,未找到新 topic 的路由信息
4. Producer 查询 NameServer,找到默认主题路由信息
5. Producer 根据默认主题路由信息,将消息发送到默认主题的其中一个 Broker
6. 收到默认主题消息的 Broker,根据消息的原始 topic,创建相应的路由信息,并通过心跳包告知 NameServer
7. Producer 下次发送该 topic 的消息时
- 如果已经存在该消息的路由(定时拉取):则直接根据路由发送消息
- 如果该消息的路由还没来得及同步:则继续发送到默认主题
从上面的自动创建 topic 流程中,我们会发现,如果新创建的 topic 信息没有来得及同步时,再次发送消息,可能会在其他 Broker 也创建该 topic 的队列,但是如果只是发送了一条该 topic 的消息后就等待一段时间,等路由信息同步完成后,再发送就会出现整个消息队列集群中,只有一个 broker 负责该 topic,这样就对并发性产生较大的影响,试想一下,你的消息队列本来有 10 个 Broker 节点,他们都配置成自动创建 Topic,然后 10 个 Producer 分别发送不同的 topic 消息,但是它们都只发送了一条消息就休息了一段时间,这 10 个 Producer 根据路由选择策略,碰巧都选择了同一个 Broker,那么最后这个消息队列集群,就只有一个 Broker 在工作,其负担了所有 topic 的任务。
上面的例子,虽然有些极端,但是这也正是生产环境中不使用自动创建 Topic 策略的原因。除了这种极端情况,可能上例中的每个 Producer 都在对应 topic 路由信息同步前,将消息发送到了多个 Broker,这些 Broker 都会创建相应 topic,那么每个 Topic 都会由多个 Broker 负责,这样整个服务的并发能力就会得到充分的利用。
路由选择
根据路由信息选择消息队列,返回的消息队列按照 broker、序号排序。举例说明,如果 topicA 在 broker-a,broker-b 上分别创建了 4 个队列,那么返回的消息队列如下:
[
{
“brokerName”:”broker-a”,
“queueId”: 0
},
{
“brokerName”:”broker-a”,
“queueId”: 1
},
{
“brokerName”:”broker-a”,
“queueId”: 2
},
{
“brokerName”:”broker-a”,
“queueId”: 3
},
{
“brokerName”:”broker-b”,
“queueId”: 0
},
{
“brokerName”:”broker-b”,
“queueId”: 1
},
{
“brokerName”:”broker-b”,
“queueId”: 2
},
{
“brokerName”:”broker-b”,
“queueId”: 3
}
]
首先消息发送端采用重试机制,由 retryTimesWhenSendFailed 指定同步方式重试次数,异步重试机制在收到消息发送请求后,执行回调之前进行重试,由 retryTimesWhenSendAsyncFailed 指定。接下来就是循环执行:选择消息队列、发送消息,发送成功则返回,收到异常则重试。
如果在一次消息发送的过程中消息发送失败了,那么在下次重试的过程中,会排除掉上次失败的 Broker。
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) { //空表示第一次发送
return selectOneMessageQueue(); // 循环选择一个队列
} else {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {// 遍历所有队列
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {// 排除上一次的broker
return mq;
}
}
return selectOneMessageQueue(); // 如果只有一个broker,则继续循环选择套路
}
}
public MessageQueue selectOneMessageQueue() { // 循环选择
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
从上面的实现中,可以看出在一次消息发送的过程中,可以通过重试机制绕开失败过的 Broker,但是如果是发送多个消息,上述机制就无法绕开会失败的 Broker。
我们前面已经说过,如果 Broker 宕机,可能会花费很长时间才能同步到各个 Producer,那么怎么在 Broker 宕机的信息同步到 Producer 之前,绕开它而将消息发送到别的正常 Broker 上呢,这就不得不提 RocketMQ 的另一个容错机制————故障延迟机制。
首先我们需要知道 RocketMQ 在哪里存储失败的 Broker,这些信息都存在 LatencyFaultToleranceImpl 中,其中有一个 ConcurrentHashMap
// FaultItem fields
private final String name;
private volatile long currentLatency; //请求该节点的耗时
private volatile long startTimestamp; //预估下次可用的时间点
public boolean isAvailable() { // 当前时间大于等于预估可用点时,则认为可用
return (System.currentTimeMillis() - startTimestamp) >= 0;
}
FaultItem 的信息,在每次消息发送成功,和消息发送失败时,进行更新。 - 如果消息发送成功,currentLatency 赋值为本次请求的实际耗时 - 消息发送失败,currentLatency 赋值为 30s
startTimestamp 是根据 currentLatency 进行设置的,RocketMQ 将 currentLatency 分成了不同的档位,不同档位的 currentLatency 会对应不同的 notAvailableDuration,然后: startTimestamp=System.currentTimeMillis() + notAvailableDuration currentLatency 与 notAvailableDuration 的对应关系如下图:
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
// 按从大到小的顺序,找到 currentLatency 所在的区间,然后输出该区间对应的不可用时长
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i—) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
由此,我们可以算出如果消息发送失败,那么 RocketMQ 正常来说会禁用该 Broker 十分钟。
知道了 RocketMQ 如何存储失败节点,在让我们来看看它是如何利用该信息,达到避开失败节点的效果。
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, /通常情况下,指上次请求失败时用到的节点/final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
// 外层轮询,下次请求时会选择下一个队列
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
// index用于内层轮询,从而排除不可用的节点
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 验证可用性,latencyFaultTolerance存储了各个Broker发送消息的耗时,已经预估的下次可用时间点
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
// 如果是第一次请求,并且可用,则直接返回
// 如果是重试请求,则只使用brokerName等于lastBrokerName相同的,这点大家肯定有疑问:为啥要使用上次失败的,其实这里的lastBrokerName不单单指上次发送失败的节点,
// 它还能蕴含推荐节点的信息,本函数的后半段中会看到如何将推荐节点的信息传递到lastBrokerName
// 但是我觉得这样写的一个弊端是:当前可用节点,如果和上次失败时所用的节点不一致时,就会被排除掉,这也会影响效率
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
// 走到这一步,代表所有节点都不可用(首次请求)或者上一次失败时用到的节点仍然不可用(重试请求)
// 随后,将所有不可用的节点,按照潜在可用性(当前可用与否>上次使用该节点时的调用耗时>预估的下次可使用时间),进行了排序,然后选择最优的结果
// 这里不用担心,连续重试时pickOneAtLeast每次都选择了相同节点,因为其内部也是用了轮训机制,会从最优->次优的顺序给出下一次推荐的节点
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
//getQueueIdByBroker这个函数名也是惊到我了,完全和其功能不匹配,本行代码意在得到推荐节点是否仍然存在可写队列,如果存在,得出队列数
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
// 我们得到的推荐节点存在可写队列
if (writeQueueNums > 0) {
// 至此我们已经拿到了一个推荐节点,但是接下来代码的作者并没有简单地根据推荐节点来寻找队列,而是靠外层轮训找了下一个队列
// 如果再次重试过程中没有发生路由信息更新的话,该队列应该仍然是不可用的,并且很可能仍是最初失败的Broker节点的队列,
// 为什么这么说:假如消息队列为[BrokerA1,BrokerA2,BrokerA3,BrokerA4,BrokerB1,BrokerB2],只有当上一次轮训到BrokerA4时,这里才会跳过BrokerA而得到BrokerB的队列
// 而且,可能下次更新路由表时,该信息可能就会成为过期数据而被GC,我猜作者是觉得反正这个数据快没用了,不如把它替换成刚才得到的推荐节点信息,这样可以少new一个对象,还能增加下次查找时的效率
// 之所以说它能增加效率,是因为这个过程实质上是将一个很可能宕机的节点队列换成了最可能可用的节点信息,那么下次再轮训到这个节点时,实际上就跳过了寻找推荐节点的过程
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
// 把得到的队列的BrokerName改成我们前面得到的推荐节点,这样如果再请求失败并且重试的时候,lastBrokerName其实存储的就是推荐节点的信息了,下次再执行本函数时就会优先使用推荐节点的其他队列
mq.setBrokerName(notBestBroker);
// 重新计算其队列编号,因为得到的这个队列数可能和推荐节点的队列数不一致,如果用了错误的队列序号,消息发送到Broker那时,肯定会报错
// 因此,这里基于外层轮询使用的index,对本次使用的队列编号进行了计算,我觉得这里最终达到的是随机选择的效果
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else { // 推荐节点不存在可写队列了,说明该节点可能已经宕机,并且NameServer已经删除了其路由信息,并且已经同步过来了
// 这时候可以将该节点从延迟统计表中删除,不在考虑该节点
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error(“Error occurred when selecting message queue”, e);
}
// 如果拿到的推荐节点已经不存在可写队列了,就随机选一个队列
return tpInfo.selectOneMessageQueue();
}
// 默认策略
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
在我看来,RocketMQ 的队列选择算法很恐怖,我不确定是特意设计成这样的,还是历史发展出来的奇怪产物。Github 仓库中,也有很多人提 issue 问这个函数的设计深意。而且,其他人的文章中对该函数的介绍都是一笔带过。我这里,以我的理解对该算法进行了详细的分析,可能有些地方理解的不对,希望大家在留言中指出。
简单的说,上述算法按照如下流程工作: 1. 轮训所有队列,通过 LatencyFaultTolerance 找到可用队列 2. 如果未找到任何可用队列,通过 LatencyFaultTolerance 存储的信息,按照三个纬度的可用性排序(当前可用与否 > 上次使用该节点时的调用耗时 > 预估的下次可使用时间),选出最可能可用的队列 3. 如果上述两个步骤都没有选出队列,则按照最简单的轮训找到下一个队列
消息发送
- 根据 MessageQueue 获取 Broker 的网络地址。如果找不到 Broker 信息,则抛出 MQClientException,提示 Broker 不存在。
- 为消息分配全局唯一 ID,如果消息体默认超过 4K(compressMsgBodyOverHowMuch), 会对消息体采用 zip 压缩,并设置消息的系统标记为 MessageSysFlag.COMPRESSED_FLAG。如果是事务 Prepared 消息,则设置消息的系统标记为 MessageSysFlag.TRANSACTION_PREPARED_TYPE。
- 如果注册了消息发送钩子函数,则执行消息发送之前的增强逻辑。
- 构建消息发送请求包。主要包含如下重要信息: 生产者组、主题名称、默认创建主题 Key、该主题在单个 Broker 默认队列数、队列 ID(队列序号)、消息系统标记 (MessageSysFlag)、消息发送时间、消息标记 (RocketMQ 对消息中的 flag 不做任何处理,供应用程序使用)、消息扩展属性、消息重试次数、是否是批量消息等。
- 根据消息发送方式,同步、异步、单向方式进行网络传输。
- 如果注册了消息发送钩子函数,执行 after 逻辑。注意,就算消息发送过程中发生 RemotingException、MQBrokerException、 InterruptedException 时该方法也会执行。
异步发送 消息异步发送是指消息生产者调用发送的 API 后,无须阻塞等待消息服务器返回本次消息发送结果,只需要提供一个回调函数,供消息发送客户端在收到响应结果回调。异步方式相比同步方式,消息发送端的发送性能会显著提高,但为了保护消息服务器的负载压力,RocketMQ 对消息发送的异步消息进行了并发控制,通过参数 clientAsyncSemaphoreValue 来控制,默认为 65535。异步消息发送虽然也可以通过 DefaultMQProducer#retryTimesWhenSendAsyncFailed 属性来控制消息重试次数,但是重试的调用入口是在收到服务端响应包时进行的,如果出现网络异常、网络超时等将不会重试。
单向发送 单向发送是指消息生产者调用消息发送的 API 后,无须等待消息服务器返回本次消息发送结果,并且无须提供回调函数,表示消息发送压根就不关心本次消息发送是否成功,其实现原理与异步消息发送相同,只是消息发送客户端在收到响应结果后什么都不做而已,并且没有重试机制。
批量发送
批量消息发送是将同一主题的多条消息一起打包发送到消息服务端,减少网络调用次数,提高网络传输效率。当然,并不是在同一批次中发送的消息数量越多性能就越好,其判断依据是单条消息的长度,如果单条消息内容比较长,则打包多条消息发送会影响其他线程发送消息的响应时间,并且单批次消息发送总长度不能超过 DefaultMQProducer#maxMessageSize。批量消息发送要解决的是如何将这些消息编码以便服务端能够正确解码出每条消息的消息内容。
RocketMQ 对批量消息使用固定格式进行存储,如下图所示。
消息存储
通过前面的知识,我们已经知道了 topic 是如何分配到 Broker 的,以及消息发送方是如何决定把消息发送给哪个 Broker 的,接下来我们看一看 Broker 介绍到消息后,是怎么存储消息的。
RocketMQ 主要存储的文件包括 CommitLog 文件、ConsumeQueue 文件、IndexFile 文件。RocketMQ 将所有主题的消息存储在同一个文件中,确保消息发送时顺序写文件,尽最大的能力确保消息发送的高性能与高吞吐量。但由于消息中间件一般是基于消息主题的订阅机制,这样便给按照消息主题检索消息带来了极大的不便。为了提高消息消费的效率,RocketMQ 引入了 ConsumeQueue 消息队列文件,每个消息主题包含多个消息消费队列,每一个消息队列有一个消息文件。IndexFile 索引文件,其主要设计理念就是为了加速消息的检索性能,根据消息的属性快速从 CommitLog 文件中检索消息。
磁盘有时候会比你想象的快很多,有时候也会比你想象的慢很多,关键在如何使用,使用得当,磁盘的速度完全可以匹配上网络的数据传输速度。目前的高性能磁盘,顺序写速度可以达到 600MB/s,超过了一般网卡的传输速度,这是磁盘比想象的快的地方。但是磁盘随机写的速度只有大概 1OOKB/s, 和顺序写的性能相差 6000 倍! 因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。
存储方案
RocketMQ 消息的存储是由 ConsumeQueue 和 CommitLog 配合完成的,消息真正的物理存储文件是 CommitLog, ConsumeQueue 是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个 Topic 下的每个 Message Queue 都有一个对应的 ConsumeQueue 文件。
CommitLog 以物理文件的方式存放,每台 Broker 上的 CommitLog 被本机器所有 ConsumeQueue 共享。在 CommitLog 中,一个消息的存储长度是不固定的,RocketMQ 采取一些机制,尽量向 CommitLog 中顺序写,但是随机读。ConsumeQueue 的内容也会被写到磁盘里作持久存储,只不过是通过异步刷盘的方式进行。
这样设计的优点: 1. CommitLog 顺序写,可以大大提高写入效率。接收到消息时,只有 CommitLog 是需要同步刷盘的(根据配置,可能也不需要同步刷盘),其他文件都是异步保存,如果发生了宕机,RocketMQ 可以根据 CommitLog 恢复 ConsumeQueue 文件和 IndexFile。 2. 虽然 CommitLog 是随机读,但是利用操作系统的 page cache 机制,可以批量地从磁盘读取,cache 存到内存中之后,加速后续的读取速度。 3. 为了保证完全的顺序写,需要 ConsumeQueue 这个中间结构,因为 ConsumeQueue 里只存偏移量信息,所以尺寸是有限的,在实际情况中,大部分的 ConsumeQueue 能够被全部读人内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。此外为了保证 CommitLog 和 ConsumeQueue 的一致性,CommitLog 里存储了 Consume Queues、Message keys、Tags 等所有信息,即使 ConsumeQueue 丢失,也可以通过 commitLog 完全恢复出来。
下图是一个 Broker 在文件系统中存储的各个文件。我们可以看到 CommitLog 文件夹、ConsumeQueue 文件夹,还有在 config 文件夹中 Topic、Consumer 的相关信息。最下面那个文件夹 index 存的是索引文件,这个文件用来加快消息查询的速度。
- commitlog: 消息存储目录。 - config: 运行期间一些配置信息,主要包括下列信息。 - consumerFilter.json: 主题消息过滤信息。 - consumerOffset.json: 集群消费模式消息消费进度。 - delayOffset.json: 延时消息队列拉取进度。 - subscriptionGroup: 消息消费组配置信息。 - topic.json:topic 配置属性。 - consumequeue: 消息消费队列存储目录。 - index: 消息索引文件存储目录。 - abort: 如果存在 abort 文件说明 Broker 非正常关闭,该文件默认启动时创建,正常退出之前删除。 - checkpoint: 文件检测点存储 commitlog 文件最后一次刷盘时间戳、consumequeue 最后一次刷盘时间、index 索引文件最后一次刷盘时间戳。
存储流程
- 如果当前 Broker 停止工作或 Broker 为 SLAVE 角色或当前 Broker 不支持写入则拒绝消息写入; 如果消息主题长度超过 256 个字符、消息属性长度超过 65536 个字符将拒绝该消息写入。
- 如果消息的延迟级别大于 0,将消息的原主题名称与原消息队列 ID 存入消息属性中,用延迟消息主题 SCHEDULE_TOPIC、消息队列 ID 更新原先消息的主题与队列。
- 获取当前可以写入的 CommitLog 文件
- 在写入 CommitLog 之前,先申请 putMessageLock,也就是将消息存储到 CommitLog 文件中是串行的
- 设置消息的存储时间,如果 CommitLog 文件不存在就需要创建新的文件
- 创建全局唯一消息 ID
- 获取该消息在消息队列的偏移量
- 计算消息总长度,并写入 CommitLog
- 如果计算发现 CommitLog 无法存储所有内容,则创建新的 CommitLog,文件名为即将插入消息的偏移
- 将消息内容写入 CommitLog 文件后根据配置进行同步或者异步刷盘
- 更新逻辑偏移量,并释放 putMessageLock
- 根据 CommitLog 偏移量,消息存储大小,tag 的 hash 值插入一条 Message 到 ConsumeQueue
- 根据 Key 的 hash 值,CommitLog 偏移量,插入一条数据到 IndexFile
- ConsumerQueue 每隔一段时间自动刷盘、IndexFile 在每次创建新 indexFile 时刷盘之前的索引文件、checkpoint 文件在刷盘 ConsumeQueue 和 IndexFile 时进行更新
内存映射
RocketMQ 通过使用内存映射文件来提高 IO 访问性能,无论是 CommitLog、 ConsumeQueue 还是 IndexFile,单个文件都被设计为固定长度,如果一个文件写满以后再创建一个新文件,文件名就为该文件第一条消息对应的全局物理偏移量。例如 CommitLog 的文件组织方式如下图所示。
RocketMQ 使用 MappedFile、 MappedFileQueue 来封装存储文件。
MappedFileQueue
MappedFileQueue 是 MappedFile 的管理容器,MappedFileQueue 是对存储目录的封装。MappedFileQueue 类的核心属性如下:
private final String storePath; // 存储目录
private final int mappedFileSize; // 单个文件的存储大小
private final CopyOnWriteArrayList
private final AllocateMappedFileService allocateMappedFileService; // 创建MappedFile服务类
private long flushedWhere = 0; // 当前刷盘指针,表示该指针之前的所有数据全部持久化到磁盘
private long committedWhere = 0; // 当前数据提交指针,内存中ByteBuffer当前的写指针,该值大于等于flushedWhere
private volatile long storeTimestamp = 0; // 刷盘时间戳
知道了 MappedFileQueue 的存储内容之后,让我们来看看通过它,我们都能做什么。 通过时间查找消息所在的文件 从 MappedFile 列表中第一个文件开始查找,找到第一个最后一次更新时间大于待查找时间戳的文件,如果不存在,则返回最后一个 MappedFile 文件。 通过偏移量查找消息所在的文件 因为 RocketMQ 会定时清除过期的数据,所以第一个 MappedFile 对应的偏移量不一定是 00000000000000000000,所以根据偏移量计算文件位置的算法为:查找偏移量 / 单个文件的大小 - 第一个文件的起始偏移量 / 单个文件的大小
(int)((offset / mappedFileSize) - (getFirstMappedFile().getFileFromOffset() / this.mappedFileSize));
MappedFile
MappedFile 是 RocketMQ 内存映射文件的具体实现,其核心属性如下:
// 操作系统每页大小,默认4k
public static final int OSPAGE_SIZE = 1024 * 4;
// 当前JVM实例中MappedFile虚拟内存
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
// 当前JVM实例中MappedFile对象个数
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
// 当前该文件的写指针,从0开始(内存映射文件中的写指针)
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
// 当前文件的提交指针,如果开启transientStorePoolEnable,则数据会存储在TransientStorePool中,然后提交到内存映射ByteBuffer中,再刷写到磁盘。
protected final AtomicInteger committedPosition = new AtomicInteger(0);
// 刷写到磁盘指针,该指针之前的数据持久化到磁盘中
private final AtomicInteger flushedPosition = new AtomicInteger(0);
// 文件大小
protected int fileSize;
// 文件通道
protected FileChannel fileChannel;
/
Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
/
// 堆外内存ByteBuffer,如果不为空,数据首先将存储在该Buffer中,然后提交到MappedFile对应的内存映射文件Buffer。transientStorePoolEnable为true时不为空。_
protected ByteBuffer writeBuffer = null;
// 堆外内存池,transientStorePoolEnable为true时启用。
protected TransientStorePool transientStorePool = null;
// 文件名称
private String fileName;
// 该文件的初始偏移量
private long fileFromOffset;
// 物理文件
private File file;
// 物理文件对应的内存映射Buffer
private MappedByteBuffer mappedByteBuffer;
// 文件最后一次内容写入时间
private volatile long storeTimestamp = 0;
// 是否是MappedFileQueue队列中第一个文件
private boolean firstCreateInQueue = false**;
在详细介绍 RocketMQ 的 MappedFile 之前,我们先插播一段关于 MappedByteBuffer 的介绍,它是 RocketMQ 实现内存映射的关键,也是 Java 官方给出的内存映射方案。
MappedByteBuffer
在深入 MappedByteBuffer 之前,先看看计算机内存管理的几个术语: - MMC:CPU 的内存管理单元。 - 物理内存:即内存条的内存空间。 - 虚拟内存:计算机系统内存管理的一种技术。它使得应用程序认为它拥有连续的可用的内存(一个连续完整的地址空间),而实际上,它通常是被分隔成多个物理内存碎片,还有部分暂时存储在外部磁盘存储器上,在需要时进行数据交换。 - 页面文件:物理内存被占满后,将暂时不用的数据移动到硬盘上。 - 缺页中断:当程序试图访问已映射在虚拟地址空间中但未被加载至物理内存的一个分页时,由 MMC 发出的中断。如果操作系统判断此次访问是有效的,则尝试将相关的页从虚拟内存文件中载入物理内存。
如果正在运行的一个进程,它所需的内存是有可能大于内存条容量之和的,如内存条是 256M,程序却要创建一个 2G 的数据区,那么所有数据不可能都加载到内存(物理内存),必然有数据要放到其他介质中(比如硬盘),待进程需要访问那部分数据时,再调度进入物理内存。
假设你的计算机是 32 位,那么它的地址总线是 32 位的,也就是它可以寻址 0xFFFFFFFF(4G)的地址空间,但如果你的计算机只有 256M 的物理内存 0x0FFFFFFF(256M),同时你的进程产生了一个不在这 256M 地址空间中的地址,那么计算机该如何处理呢?
计算机会对虚拟内存地址空间(32 位为 4G)进行分页,从而产生页(page),对物理内存地址空间(假设 256M)进行分页产生页帧(page frame),页和页帧的大小一样,所以虚拟内存页的个数势必要大于物理内存页帧的个数。在计算机上有一个页表(page table),就是映射虚拟内存页到物理内存页的,更确切的说是页号到页帧号的映射,而且是一对一的映射。
那么问题来了,虚拟内存页的个数 > 物理内存页帧的个数,岂不是有些虚拟内存页的地址永远没有对应的物理内存地址空间?不是的,操作系统是这样处理的:如果要用的页没有找到,操作系统会触发一个页面失效(page fault)功能,操作系统找到一个最少使用的页帧,使之失效,并把它写入磁盘,随后把需要访问的页放到页帧中,并修改页表中的映射,保证了所有的页都会被调度。
FileChannel 提供了 map 方法把文件映射到虚拟内存:
// 只保留了核心代码
public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {
// allocationGranularity一般等于64K,它是虚拟内存的分配粒度,由操作系统指定
// 这里将position与分配粒度取余,然后真实映射起始位置为mapPosition = position-pagePosition,position 是参数指定的 position,pagePosition是根据内存分配粒度取余的结果,最终算出映射起始地址,这样算是为了内存对齐
// 这样无论position为多少,得出的各个MappedByteBuffer实例之间的内存都是成块对齐的
// 对齐的好处:如果两个不同的MappedByteBuffer,即便它们的position不同,但是只要它们有公共映射区域的话,这些公共区域在物理内存上的分页会被共享
// 如果它们的MapMode是PRIVATE的话,那么会copy-on-write的方式来对修改内容进行私有化
// 而如果它们的MapMode是SHARED的话,那么对映射的修改,其他实例均可见
// 实际上,上述的过程都是内核来做的,我们要做的只是调用map0时将对齐好的position输入即可,这实际上是map0下层使用的mmap系统调用的约束
int pagePosition = (int)(position % allocationGranularity);
long mapPosition = position - pagePosition;
long mapSize = size + pagePosition;
try {
addr = map0(imode, mapPosition, mapSize);
} catch (OutOfMemoryError x) {
System.gc();
try {
Thread.sleep(100);
} catch (InterruptedException y) {
Thread.currentThread().interrupt();
}
try {
addr = map0(imode, mapPosition, mapSize);
} catch (OutOfMemoryError y) {
// After a second OOME, fail
throw new IOException(“Map failed”, y);
}
}
int isize = (int)size;
Unmapper um = new Unmapper(addr, mapSize, isize, mfd);
if ((!writable) || (imode == MAPRO)) {
return Util.newMappedByteBufferR(isize,
addr + pagePosition,
mfd,
um);
} else {
return Util.newMappedByteBuffer(isize,
addr + pagePosition,
mfd,
um);
}
}
上述代码可以看出: 1. map 通过 native 函数 map0 完成文件的映射工作,下层使用系统调用 mmap 2. 如果第一次文件映射导致 OOM,则手动触发垃圾回收,休眠 100ms 后再次尝试映射,如果失败,则抛出异常。 3. 如果映射成功,会得到虚拟内存地址 address 4. 根据得到的虚拟内存地址,通过 newMappedByteBuffer 方法初始化 MappedByteBuffer 实例,其最终返回的是 DirectByteBuffer,如下就是从内存地址生成 DirectByteBuffer 实例的过程。
static MappedByteBuffer newMappedByteBuffer(int size, long addr, FileDescriptor fd, Runnable unmapper) {
MappedByteBuffer dbb;
if (directByteBufferConstructor == null)
initDBBConstructor();
dbb = (MappedByteBuffer)directByteBufferConstructor.newInstance(
new Object[] { new Integer(size),
new Long(addr),
fd,
unmapper }
return dbb;
}
// 访问权限_
private static void initDBBConstructor() {
AccessController.doPrivileged(new PrivilegedAction
public Void run() {
Class<?> cl = Class.forName(“java.nio.DirectByteBuffer”);
Constructor<?> ctor = cl.getDeclaredConstructor(
new Class<?>[] { int.class,
long.class,
FileDescriptor.class,
Runnable.class });
ctor.setAccessible(true);
directByteBufferConstructor = ctor;
}});
}
由于 FileChannelImpl 和 DirectByteBuffer 不在同一个包中,所以有权限访问问题,通过 AccessController 类获取 DirectByteBuffer 的构造器进行实例化。
map0() 函数返回一个虚拟内存地址 address,这样就无需调用 read 或 write 方法对文件进行读写,通过 address 就能够操作文件。底层采用 unsafe.getByte 方法,通过(address + 偏移量)获取指定内存的数据。 - 第一次访问 address 所指向的内存区域,导致缺页中断,中断响应函数会在交换区中查找相对应的页面,如果找不到(也就是该文件从来没有被读入内存的情况),则从硬盘上将文件指定页读取到物理内存中(非 jvm 堆内存)。 - 如果在拷贝数据时,发现物理内存不够用,则会通过虚拟内存机制(swap)将暂时不用的物理页面交换到硬盘的虚拟内存中。
MappedByteBuffer 的效率之所以比 read/write 高,主要是因为 read/write 过程会涉及到用户内存拷贝到内核缓冲区,而 MappedByteBuffer 在发生缺页中断时,直接将硬盘内容拷贝到了用户内存,这也就是我们所说的零拷贝技术。所以,采用内存映射的读写效率要比传统的 read/write 性能高。
MappedByteBuffer 使用虚拟内存,因此分配 (map) 的内存大小不受 JVM 的 - Xmx 参数限制,但是也是有大小限制的。如果当文件超出大小限制 Integer.MAX_VALUE 时,可以通过 position 参数重新 map 文件后面的内容。
至此,我们已经了解了文件内存映射的技术,既然 Java 已经提供了内存映射的方案,还有 MappedFile 什么事呢?这一层封装又有何意义呢?接下来再回到 MappedFile 的介绍中来,我将详细介绍 RocketMQ 的 MappedFile 都对原生内存映射方案做了哪些增强。
初始化
在不开启 RocketMQ 的内存映射增强方案时,它会规规矩矩地使用 Java 的 MappedByteBuffer。
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
// 文件名即是起始偏移量
this.fileFromOffset = Long.parseLong(this.file.getName());
ensureDirOK(this.file.getParent());
// 通过RandomAccessFile构建NIO Channel然后通过Channel::map获得mappedByteBuffer,这就是文件内存映射,
this.fileChannel = new RandomAccessFile(this.file, “rw”).getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READWRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
通过 RandomAccessFile 创建读写文件通道,并将文件内容使用 NIO 的内存映射将文件映射到内存中,最后得到的就是 MappedByteBuffer 实例。随后介绍数据存储的时候,你就会发现在不开启 RocketMQ 内存映射优化时,它都是对 mappedByteBuffer 进行写入和刷盘。
我们知道,MappedByteBuffer 已经很快了,已经是零拷贝了,还有什么可以优化的呢?在前面对 MappedByteBuffer 的介绍中,我们知道它实际上使用的是虚拟内存,当虚拟内存的使用超过物理内存大小时,势必会造成内存交换,这就会导致在内存使用的过程中进行磁盘 IO,而且它不一定是顺序磁盘 IO,所以会很慢。而且虚拟内存的交换是由操作系统控制的,系统中的其他进程活动,也会触发 RocketMQ 内存映射的内存交换。此外,因为文件内存映射的写入过程实际上是写入 PageCache,这就涉及到 PageCache 的锁竞争,而如果直接写入内存的话就不存在该竞争,在异步刷盘的场景下可以达到更快的速度。综上 RocketMQ 就对其进行了优化,该优化使用 transientStorePoolEnable 参数控制。
如果 transientStorePoolEnable 为 true,则初始化 MappedFile 的 writeBuffer,该 buffer 从 transientStorePool 中获取。
this.writeBuffer = transientStorePool.borrowBuffer();
this.transientStorePool = transientStorePool;
那么 TransientStorePool 中拿到的 buffer 和 MappedByteBuffer 又有什么区别呢?这就得看看 transientStorePool 的代码了。
// TransientStorePool初始化过程
public void init() {
for (int i = 0; i < poolSize; i++) {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize)); // 加锁后,该内存就不会发生交换_
availableBuffers.offer(byteBuffer);
}
}
从的代码,我们可以看出该内存池的内存实际上用的也是直接内存,把要存储的数据先存入该 buffer 中,然后再需要刷盘的时候,将该 buffer 的数据传入 FileChannel,这样就和 MappedByteBuffer 一样能做到零拷贝了。除此之外,该 Buffer 还使用了 com.sun.jna.Library 类库将该批内存锁定,避免被置换到交换区,提高存储性能。
至此,我们已经知道了 RocketMQ 根据配置的不同,可能会使用来自 TransientStorePool 的 writeBuffer 或者 MappedByteBuffer 来存储数据,接下来,我们就来看一看存储数据的过程是如何实现的。
MappedFile 插入数据
这里所指的插入数据,是在内存层面将要存储的数据加入到 MappedFile 的 Buffer 中,核心实现逻辑在 appendMessagesInner: ```java public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { assert messageExt != null; assert cb != null;
int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result;
if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error(“MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}”, currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
} ``` 从第八行我们可以看到,如果 writeBuffer 不为空,说明使用了 TransientStorePool,则使用 writeBuffer 作为写入时使用的 buffer,否则使用 mappedByteBuffer。然后根据当前的写指针 wrotePosition 设置 buffer 的 position,而实际的写入过程在 AppendMessageCallback::doAppend 中。写入完成后更新写指针 wrotePosition 和存储时间戳。
slice() 方法创建一个共享缓存区,与原先的 ByteBuffer 共享内存但维护一套独立的指针 (position、mark、limit)。
MappedFile 提交
MappedFile 提交实际上是将 writeBuffer 中的数据,传入 FileChannel,所以只有在 transientStorePoolEnable 为 true 时才有实际作用:
public int commit(final int commitLeastPages) {
if (writeBuffer == null) {
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
return this.wrotePosition.get();
}
if (this.isAbleToCommit(commitLeastPages)) {
if (this.hold()) {
commit0(commitLeastPages);
this.release();
} else {
log.warn(“in commit, hold failed, commit offset = “ + this.committedPosition.get());
}
}
// All dirty data has been committed to FileChannel.
if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
this.transientStorePool.returnBuffer(writeBuffer);
this.writeBuffer = null;
}
return this.committedPosition.get();
}
commitLeastPagesTransientStorePool 为本次提交最小的页数,如果待提交数据不满 commitLeastPages,则不执行本次提交操作,待下次提交。writeBuffer 如果为空,直接返回 wrotePosition 指针,无须执行 commit 操作,正如前面所说,commit 操作主体是 writeBuffer。
private boolean isAbleToFlush(final int flushLeastPages) {
int flush = this.flushedPosition.get();
int write = getReadPosition();
if (this.isFull()) {
return true;
}
if (flushLeastPages > 0) {
return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
}
return write > flush;
}
判断是否执行 commit 操作。如果文件己满返回 true; 如果 commitLeastPages 大于 0, 则比较 wrotePosition(当前 writeBuffer 的写指针)与上一次提交的指针 (committedPosition) 的差值,除以 OS_PAGE_SIZE 得到当前脏页的数量,如果大于 commitLeastPages 则返回 true; 如果 commitLeastPages 小于 0 表示只要存在脏页就提交。
protected void commit0(final int commitLeastPages) {
int writePos = this.wrotePosition.get();
int lastCommittedPosition = this.committedPosition.get();
if (writePos - this.committedPosition.get() > 0) {
try {
ByteBuffer byteBuffer = writeBuffer.slice();
byteBuffer.position(lastCommittedPosition);
byteBuffer.limit(writePos);
this.fileChannel.position(lastCommittedPosition);
this.fileChannel.write(byteBuffer);
this.committedPosition.set(writePos);
} catch (Throwable e) {
log.error(“Error occurred when commit data to FileChannel.”, e);
}
}
}
具体的提交实现。首先创建 writeBuffer 的共享缓存区,然后将新创建的 buffer position 回退到上一次提交的位置 (committedPosition),设置 limit 为 wrotePosition(当前最大有效数据指针),然后把 committedPosition 到 wrotePosition 的数据复制(写入) 到 FileChannel 中,然后更新 committedPosition 指针为 wrotePosition,commit 的作用就是将 writeBuffer 中的数据提交到文件通道 FileChannel 中, CommitLog 在采用异步存储方式时,会有一个后台任务循环的进行 commit 操作,如果进行同步存储,也会主动调用 MappedFile 的 commit,随后再调用 flush 刷盘。
MappedFile 刷盘
刷盘指的是将内存中的数据刷写到磁盘,永久存储在磁盘中,其具体实现由 MappedFile 的 flush 方法实现,如下所示。
public int flush(final int flushLeastPages) {
if (this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
int value = getReadPosition();
**try** {<br /> _//We only append data to fileChannel or mappedByteBuffer, never both._<br /> **if** (writeBuffer != null || **this**.fileChannel.position() != 0) {<br /> **this**.fileChannel.force(false);<br /> } **else** {<br /> **this**.mappedByteBuffer.force();<br /> }<br /> } **catch** (Throwable e) {<br /> log.error("Error occurred when force data to disk.", e);<br /> }
**this**.flushedPosition.**set**(**value**);<br /> **this**.release();<br /> } **else** {<br /> log.warn("in flush, hold failed, flush offset = " + **this**.flushedPosition.**get**());<br /> **this**.flushedPosition.**set**(getReadPosition());<br /> }<br /> }<br /> **return** **this**.getFlushedPosition();<br />}<br />flush 函数和 commit 一样也可以传入一个刷盘页数,当脏页数量达到要求时,会进行刷盘操作,如果使用 writeBuffer 存储的话则调用 fileChannel 的 force 将内存中的数据持久化到磁盘,刷盘结束后,flushedPosition 会等于 committedPosition,否则调用 mappedByteBuffer 的 force,最后 flushedPosition 会等于 writePosition。<br />我们不妨分析一下 wrotePosition,committedPosition,flushedPosition 的关系,当有新的数据要写入时,先会写入内存,然后 writePosition 代表的就是内存写入的末尾,commit 过程只有 transientStorePoolEnable 为 true 时才有意义,代表的是从 writeBuffer 拷贝到 FileChannel 时,拷贝数据的末尾,而 flushedPosition 则代表将内存数据刷盘到物理磁盘的末尾。<br />综上所述,我们可以得到一个关于这三个 position 之间的关系: - transientStorePoolEnable: flushedPosition<=committedPosition<=wrotePosition - MappedByteBuffer only: flushedPosition<=wrotePosition
获取 MappedFile 最大读指针
RocketMQ 文件的一个组织方式是内存映射文件,预先申请一块连续的固定大小的内存,需要一套指针标识当前最大有效数据的位置,获取最大有效数据偏移量的方法由 MappedFile 的 getReadPosition 方法实现,如下所示。
/**
@return The max position which have valid data
/
public int getReadPosition() {
return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
}
获取当前文件最大的可读指针。如果 writeBuffer 为空,则直接返回当前的写指针; 如果 writeBuffer 不为空,则返回上一次提交的指针。在 MappedFile 设计中,只有提交了的数据 (写入到 MappedByteBuffer 或 FileChannel 中的数据) 才是安全的数据。为什么没刷盘之前也认为是安全数据呢,这就和 MappedByteBuffer 和 FileChannel 的写入机制有关了,无论是 MappedByteBuffer 还是 FileChannel 在写入数据时,实际上只是将数据写入 PageCache,而操作系统会自动的将脏页刷盘,这层 PageCache 就是我们应用和物理存储之间的夹层,当我们将数据写入 PageCache 后,即便我们的应用崩溃了,但是只要系统不崩溃,最终也会将数据刷入磁盘。所以,RocketMQ 以写入 PageCache 作为数据安全可读的判断标准。
读取数据
RocketMQ 在读数据时,使用的是 MappedByteBuffer,并且以最大读指针作为可读数据的末尾。之所以使用 MappedByteBuffer 而不是 FileChannel 主要是因为它更快,这一点在后面的各种流速度对比中就能看到。
public SelectMappedBufferResult selectMappedBuffer(int pos, int size) {
int readPosition = getReadPosition();
if ((pos + size) <= readPosition) {
if (this.hold()) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(pos);
ByteBuffer byteBufferNew = byteBuffer.slice();
byteBufferNew.limit(size);
return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
} else {
log.warn(“matched, but hold failed, request pos: “ + pos + “, fileFromOffset: “
+ this.fileFromOffset);
}
} else {
log.warn(“selectMappedBuffer request pos invalid, request pos: “ + pos + “, size: “ + size
+ “, fileFromOffset: “ + this.fileFromOffset);
}
return null;
}
MappedFile 销毁
为了保证 MappedFile 在销毁的时候,不对正在进行的读写造成影响,所以 MappedFile 实际上还是一个计数引用资源,每当要进行读写操作时,都需要调用其 hold 函数,当使用完成后需要主动调用 release 函数释放资源。
// ReferenceResource
// 默认引用数为1,当需要销毁时调用release将其减为0,最后释放资源
protected final AtomicLong refCount = new AtomicLong(1);
// 标识资源是否可用(未被销毁)
protected volatile boolean available = true;
// 每当持有资源时,引用数加一,如果发现已经不可用就回退,这里用双层检验保证线程安全:1.isAvailable()2.this.refCount.getAndIncrement() > 0
public synchronized boolean hold() {
if (this.isAvailable()) {
if (this.refCount.getAndIncrement() > 0) {
return true;
} else {
this.refCount.getAndDecrement();
}
}
return false;
}
// 释放资源,如果引用数小于0,则开始销毁逻辑
public void release() {
long value = this.refCount.decrementAndGet();
if (value > 0)
return;
synchronized (this) {
this.cleanupOver = this.cleanup(value);
}
}
// 主动触发销毁过程,实际上会调用 release 函数来进行销毁,这里如果销毁失败,会在每次尝试销毁时,按照一定的时间间隔,将引用数-1000来强制进行销毁。
public void shutdown(final long intervalForcibly) {
if (this.available) {
this.available = false;
this.firstShutdownTimestamp = System.currentTimeMillis();
this.release();
} else if (this.getRefCount() > 0) {
if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
this.refCount.set(-1000 - this.getRefCount());
this.release();
}
}
}
MappedFile 的销毁就是通过调用 ReferenceResource 的 shutdown 来实现的,实际上 MappedFile 是 ReferenceResource 的子类,并实现了其 cleanup 函数。综上所述,MappedFile 的销毁过程就是:MappedFile::destroy -> ReferenceResource::shutdown -> ReferenceResource::release -> MappedFile::cleanup。
public boolean destroy(final long intervalForcibly) {
this.shutdown(intervalForcibly);
if (this.isCleanupOver()) {
try {
this.fileChannel.close();
log.info(“close file channel “ + this.fileName + “ OK”);
long beginTime = System.currentTimeMillis();
boolean result = this.file.delete();
log.info(“delete file[REF:” + this.getRefCount() + “] “ + this.fileName
+ (result ? “ OK, “ : “ Failed, “) + “W:” + this.getWrotePosition() + “ M:”
+ this.getFlushedPosition() + “, “
+ UtilAll.computeElapsedTimeMilliseconds(beginTime));
} catch (Exception e) {
log.warn(“close file channel “ + this.fileName + “ Failed. “, e);
}
return true;
} else {
log.warn(“destroy mapped file[REF:” + this.getRefCount() + “] “ + this.fileName
+ “ Failed. cleanupOver: “ + this.cleanupOver);
}
return false;
}
MappedByteBuffer 的释放过程实际上有些诡异,Java 官方没有提供公共的方法来进行 MappedByteBuffer 的回收,所以不得不通过反射来进行回收,这也是 MappedByteBuffer 比较坑的一点,我们不妨简单看下 MappedFile 的 cleanup 逻辑。
public boolean cleanup(final long currentRef) {
if (this.isAvailable()) {
log.error(“this file[REF:” + currentRef + “] “ + this.fileName
+ “ have not shutdown, stop unmapping.”);
return false;
}
if (this.isCleanupOver()) {
log.error(“this file[REF:” + currentRef + “] “ + this.fileName
+ “ have cleanup, do not do it again.”);
return true;
}
clean(this.mappedByteBuffer);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(this.fileSize (-1));
TOTAL_MAPPED_FILES.decrementAndGet();
log.info(“unmap file[REF:” + currentRef + “] “ + this.fileName + “ OK”);
return true;
}
public static void clean(final ByteBuffer buffer) {
if (buffer == null || !buffer.isDirect() || buffer.capacity() == 0)
*return;
invoke(invoke(viewed(buffer), “cleaner”), “clean”);
}
private static Object invoke(final Object target, final String methodName, final Class<?>… args) {
return AccessController.doPrivileged(new PrivilegedAction