基础概念

消息模型(Message Model)

RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息, Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。 ConsumerGroup 由多个Consumer 实例构成。

消息生产者(Producer)

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异 步方式均需要Broker返回确认信息,单向发送不需要。生产者中,会把同一类Producer组成一个集合,叫做生产者组,这类Producer发送同一类消息且发送 逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组 的其他生产者实例以提交或回溯消费。

消息消费者(Consumer)

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提 供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

  • 拉取式消费的应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控 制。一旦获取了批量消息,应用就会启动消费过程。
  • 推动式消费模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。

消费者同样会把同一类Consumer组成一个集合,叫做消费者组,这类Consumer通常消费同一类消息 且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的 是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

  • 集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
  • 广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。

    主题(Topic)

    表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
    同一个Topic下的数据,会分片保存到不同的Broker上,而每一个分片单位,就叫做MessageQueue。MessageQueue是生产者发送消息与消费者消费消息的最小单位。

    代理服务器(Broker Server)

    消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的 消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

  • Broker Server是RocketMQ真正的业务核心,包含了多个重要的子模块:

  • Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
  • Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
  • Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
  • HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
  • Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快 速查询。

而Broker Server要保证高可用需要搭建主从集群架构。RocketMQ中有两种Broker架构模式:

  1. 普通集群:

这种集群模式下会给每个节点分配一个固定的角色,master负责响应客户端的请求,并存储消息。 slave则只负责对master的消息进行同步保存,并响应部分客户端的读请求。消息同步方式分为同步同 步和异步同步。
注:这种集群模式下各个节点的角色无法进行切换,也就是说,master节点挂了,这一组Broker就不可用了。

  1. Dledger高可用集群:

Dledger是RocketMQ自4.5版本引入的实现高可用集群的一项技术。这个模式下的集群会随机选出一个节点作为master,而当master节点挂了后,会从slave中自动选出一个节点升级成为master。
Dledger技术做的事情:1、接管Broker的CommitLog消息存储 2、从集群中选举出master节点 3、完成master节点往slave节点的消息同步。

名字服务(Name Server)

名称服务充当路由消息的提供者。Broker Server会在启动时向所有的Name Server注册自己的服务信息,并且后续通过心跳请求的方式保证这个服务信息的实时性。生产者或消费者能够通过名字服务查找 各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。
这种特性也就意味着NameServer中任意的节点挂了,只要有一台服务节点正常,整个路由服务就不会有影响。当然,这里不考虑节点的负载情况。

消息(Message)

消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题Topic。 RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过 Message ID和Key查询消息的功能。
并且Message上有一个为消息设置的标志,Tag标签。用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。

消息存储

何时存储消息

分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。
1. MQ收到一条消息后,需要向生产者返回一个ACK响应,并将消息存储起来。
2. MQ Push一条消息给消费者后,等待消费者的ACK响应,需要将消息标记为已消费。如果没有标记为消费,MQ会不断的尝试往消费者推送这条消息。
3. MQ需要定期删除一些过期的消息,这样才能保证服务一直可用。

消息存储介质

RocketMQ采用的是类似于Kafka的文件存储机制,即直接用磁盘文件来保存消息,而不需要借助MySQL这一类索引工具。

磁盘保存文件慢吗?

磁盘如果使用得当,磁盘的速度完全可以匹配上网络的数据传输速度。目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。RocketMQ的消息用顺序写,保证了消息存储的速度。

零拷贝技术加速文件读写

DMA 技术,也就是直接内存访问(Direct Memory Access 技术。
在进行 I/O 设备和内存的数据传输的时候,数据搬运的工作全部交给 DMA 控制器,而 CPU 不再参与任何与数据搬运相关的事情,这样 CPU 就可以去处理别的事务。

传统的拷贝是从内核态的磁盘到磁盘的缓冲区,再到用户态,再从用户态写到内核态网络缓冲区,再写到网卡,频繁的切换效率比较低。
RocketMQ高级原理 - 图1
首先,期间共发生了 4 次用户态与内核态的上下文切换,因为发生了两次系统调用,一次是read() ,一次是write(),每次系统调用都得先从用户态切换到内核态,等内核完成任务后,再从内核态切换回用户态。
其次,还发生了 4 次数据拷贝,其中两次是 DMA 的拷贝,另外两次则是通过 CPU 拷贝的

所以,要想提高文件传输的性能,就需要减少「用户态与内核态的上下文切换」和「内存拷贝」的次数

优化后的零拷贝,就是不再把缓冲区的内容拷贝到用户态,而是直接从用户态映射到缓冲区,再在内核态中从一个缓冲区拷贝到另一个缓冲区。
RocketMQ高级原理 - 图2
从 Linux 内核2.4版本开始起,又进行了优化,直接从磁盘缓冲区拷贝到网卡中,少了一次缓冲区间的拷贝。
RocketMQ高级原理 - 图3

消息存储结构

RocketMQ消息的存储分为三个部分:

  • CommitLog:存储消息的元数据。所有消息都会顺序存入到CommitLog文件当中。CommitLog 由多个文件组成,每个文件固定大小1G。以第一条消息的偏移量为文件名。
  • ConsumerQueue:存储消息在CommitLog的索引。一个MessageQueue一个文件,记录当前 MessageQueue被哪些消费者组消费到了哪一条CommitLog。
  • IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来 查找消息的方法不影响发送与消费消息的主流程。

整体的消息存储结构如下图:
image.png

刷盘机制

RocketMQ需要将消息存储到磁盘上,这样才能保证断电后消息不会丢失。同时这样才可以让存储的消息量可以超出内存的限制。RocketMQ为了提高性能,会尽量保证磁盘的顺序写。消息在写入磁盘时,有两种写磁盘的方式,同步刷盘和异步刷盘
image.png

  • 同步刷盘:

在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻 通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功 的状态。

  • 异步刷盘:

在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大; 当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。

  • 配置方式:

刷盘方式是通过Broker配置文件里的flushDiskType 参数设置的,这个参数被配置成 SYNC_FLUSH、ASYNC_FLUSH中的 一个。

消息主从复制

如果Broker以一个集群的方式部署,会有一个master节点和多个slave节点,消息需要从Master复制到 Slave上。而消息复制的方式分为同步复制和异步复制。

  • 同步复制:

同步复制是等Master和Slave都写入消息成功后才反馈给客户端写入成功的状态。在同步复制下,如果Master节点故障,Slave上有全部的数据备份,这样容易恢复数据。但是同步复制会增大数据写入的延迟,降低系统的吞吐量。

  • 异步复制:

异步复制是只要master写入消息成功,就反馈给客户端写入成功的状态。然后再异步的将消息复制给 Slave节点。在异步复制下,系统拥有较低的延迟和较高的吞吐量。但是如果master节点故障,而有些数据没有完成 复制,就会造成数据丢失。

  • 配置方式:

消息复制方式是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成 ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个。

负载均衡

Producer负载均衡

Producer发送消息时,默认会轮询目标Topic下的所有MessageQueue,并采用递增取模的方式往不同的MessageQueue上发送消息,以达到让消息平均落在不同的queue上的目的。而由于 MessageQueue是分布在不同的Broker上的,所以消息也会发送到不同的broker上。
image.png
同时生产者在发送消息时,可以指定一个MessageQueueSelector。通过这个对象来将消息发送到自己指定的MessageQueue上。这样可以保证消息局部有序。

Consumer负载均衡

Consumer也是以MessageQueue为单位来进行负载均衡。分为集群模式广播模式
image.png

消息重试

首先对于广播模式的消息, 是不存在消息重试的机制的,即消息消费失败后,不会再重新进行发送,而只是继续消费新的消息。
而对于普通的消息,当消费者消费消息失败后,可以通过设置返回状态达到消息重试的结果。

如何让消息进行重试

集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置。可以有三种配置方式:

  • 返回Action.ReconsumeLater-推荐
  • 返回null
  • 抛出异常

    1. public class MessageListenerImpl implements MessageListener {
    2. @Override
    3. public Action consume(Message message, ConsumeContext context) {
    4. //处理消息
    5. doConsumeMessage(message);
    6. //方式1:返回 Action.ReconsumeLater,消息将重试
    7. return Action.ReconsumeLater;
    8. //方式2:返回 null,消息将重试
    9. return null;
    10. //方式3:直接抛出异常, 消息将重试
    11. throw new RuntimeException("Consumer Message exceotion");
    12. }
    13. }

    如果希望消费失败后不重试,可以直接返回Action.CommitMessage。

    1. public class MessageListenerImpl implements MessageListener {
    2. @Override
    3. public Action consume(Message message, ConsumeContext context) {
    4. try {
    5. doConsumeMessage(message);
    6. } catch (Throwable e) {
    7. //捕获消费逻辑中的所有异常,并返回 Action.CommitMessage;
    8. return Action.CommitMessage;
    9. }
    10. //消息处理正常,直接返回 Action.CommitMessage;
    11. return Action.CommitMessage;
    12. }
    13. }

    重试消息如何处理

    重试的消息会进入一个 “%RETRY%”+ConsumeGroup 的队列中。
    image.png然后RocketMQ默认允许每条消息最多重试16次,每次重试的间隔时间如下:
    image.png
    重试次数:
    如果消息重试16次后仍然失败,消息将不再投递。转为进入死信队列
    另外一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的。
    然后关于这个重试次数,RocketMQ可以进行定制。例如通过 consumer.setMaxReconsumeTimes(20);将重试次数设定为20次。当定制的重试次数超过16次后,消息的重试时间间隔均为2小时。
    关于MessageId:
    在老版本的RocketMQ中,一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的。 但是在4.7.1版本中,每次重试MessageId都会重建。
    配置覆盖:
    消息最大重试次数的设置对相同GroupID下的所有Consumer实例有效。并且最后启动的Consumer会覆盖之前启动的Consumer的配置。

    死信队列

    当一条消息消费失败,RocketMQ就会自动进行消息重试。而如果消息超过最大重试次数,RocketMQ 就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列
    死信队列的名称是%DLQ%+ConsumGroup
    image.png
    死信队列的特征:

  • 一个死信队列对应一个ConsumGroup,而不是对应某个消费者实例。

  • 如果一个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列。
  • 一个死信队列包含了这个ConsumeGroup里的所有死信消息,而不区分该消息属于哪个Topic。
  • 死信队列中的消息不会再被消费者正常消费。
  • 死信队列的有效期跟正常消息相同。默认3天,对应broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。

通常,一条消息进入了死信队列,意味着消息在消费处理的过程中出现了比较严重的错误,并且无法自行恢复。此时,一般需要人工去查看死信队列中的消息,对错误原因进行排查。然后对死信消息进行处理,比如转发到正常的Topic重新进行消费,或者丢弃。

消息幂等

幂等的概念

在MQ系统中,对于消息幂等有三种实现语义:

  • at most once 最多一次:每条消息最多只会被消费一次
  • at least once 至少一次:每条消息至少会被消费一次
  • exactly once 刚刚好一次:每条消息都只会确定的消费一次

这三种语义都有他适用的业务场景。
其中,at most once是最好保证的。RocketMQ中可以直接用异步发送、sendOneWay等方式就可以保 证。
而at least once这个语义,RocketMQ也有同步发送、事务消息等很多方式能够保证。
而这个exactly once是MQ中最理想也是最难保证的一种语义,需要有非常精细的设计才行。RocketMQ 只能保证at least once,保证不了exactly once。所以,使用RocketMQ时,需要由业务系统自行保证 消息的幂等性。

消息幂等的必要性

在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:

  • 发送时消息重复

当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会 收到两条内容相同并且 Message ID 也相同的消息。

  • 投递时消息重复

消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投 递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

  • 负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)

当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费 者可能会收到重复消息。

处理方式

从上面的分析中,我们知道,在RocketMQ中,是无法保证每个消息只被投递一次的,所以要在业务上自行来保证消息消费的幂等性。
而要处理这个问题,RocketMQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程中 是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。
但是,这个MessageId是无法保证全局唯一的,也会有冲突的情况。所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识比较靠谱。例如订单ID。而这个业务标识可以使用Message的 Key来进行传递。