1.前言

如果想要详细的了解一个消息中间件,那么详细的了解其消息的发布与消费流程是必不可少的一部分。
通过详细的了解消息的发布与消费过程,以及其中涉及到的多个机制,可以让我们对消息中间件的理解更加透彻
接下来就开始RabbitMq消息发布与消费流程的学习

2.消息的构成与相关属性

消息的种类

RabbitMq中消息分为两种

  1. 持久化消息(Durable Message):消息被发布后会持久化到Broker的硬盘当中。从而确保消息不会应为Broker重启而丢失,消息安全性更高。但是在发布确认模式下,吞吐量会受影响
  2. 瞬态消息(Transient Message):消息被发布后存在于内存中,只有当内存不足时,才会写入磁盘。相对来说可能会造成消息的丢失,在发布确认模式下,吞吐量比持久化消息会高一些

    消息的构成

    与Kafka类似,RabbitMq中消息的存储同样分为索引文件与消息文件两部分,其中索引文件为每个队列独占,而消息文件是所有队列共享

    索引文件(rabbit_queue_index)

    负责维护队列中落盘消息的信息,包括消息的存储地点、是否已经被交付给消费者、是否已被消费者ack等,每个队列都有一个与之对应的rabbit_queue_index。
    索引文件的格式为分段存储,文件编号从0开始,文件后缀为.idx

    消息文件(rabbit_msg_store)

    rabbit_msg_store以键值对的形式存储消息,每个节点有且只有一个,所有队列共享。rabbit_msg_store又可以分为msg_store_persistent与msg_store_transient

  3. msg_store_persistent:负责持久化消息的存储,不会丢失

  4. msg_store_transient:负责非持久化消息的存储,重启后消息会丢失。

消息文件的格式同样为分段存储,文件编号从0开始,文件后缀为.rdq
rdq文件消息格式:<>

  • MsgId:RabbitMQ通过rabbit_guid:gen()为每一个消息生成的GUID,
  • MsgBody:包含消息对应的exchange,routing_keys,消息的内容,消息对应的协议版本,消息内容格式。

    消息的状态

    RabbitMq中消息的状态大致分为3种
  1. Ready:消息已经持久化成功,等待被消费
  2. Unacked:消息已经被Consumer获取,但是还没有被Consumer确认消费成功。如果在客户端断开链接后消息依然没有被确认,那么消息会重新进入Unacked状态
  3. Ack:消息被Consumer确认消费成功

    3.消息的发布

    流程

  4. Publisher与Broker建立TCP链接

  5. Publisher中的线程在TCP链接中打开一个Channel
  6. Publisher发布消息
  7. Broker接收到消息
  8. Broker解析消息中的信息,将消息发送到对应的Visual Host处理
  9. Broker继续解析消息中的信息,将消息发送到对应的Exchange
  10. Exchange通过特定的路由匹配规则或转发规则,将消息发送到对应的Queue

至此,消息发送到了对应的Queue中

时序图

yuque_diagram.jpg

发布确认

作用

经过上述两步,消息已经写入发送到了对应的Queue中,那么还有一个问题,Publisher如何知道消息是否发送成功了呢?这就需要一个所谓的发布确认机制了(类似于Kafka的akcs应答机制)
RabbitMq默认是关闭发布确认的,也就是说,Publisher发出消息后即认为消息发布成功
而在开启了发布确认机制后,Publisher在每个打开的Channel中发布的每一条消息,对有一个对应的消息ID,从1开始递增.
而Broker在发送或持久化完消息后,会给Publisher一个响应ack,响应ack中包含消息的唯一ID。Publisher就可以根据消息ID进行后续的处理。需要注意的是,当Broker内部发生错误导致消息发布失败时,会返回一个nack,Publisher同样可以进行异常消息的处理

时机

不可路由的消息

在Exchange验证确认了消息不会被发送到任何队列后,触发发布确认

可路由的消息
  1. 无镜像队列,无队列持久化与消息持久化:在消息发送到交换机需要转发的全部队列后,触发发布确认机制
  2. 无镜像队列,有队列持久化与消息持久化:在消息发送到交换机需要转发的全部队列,并且完成持久化后,触发发布确认机制
  3. 有镜像队列,有队列持久化与消息持久化:在消息发送到交换机需要转发的全部队列,并且本节点队列和所有镜像节点队列全部完成持久化后,触发发布确认机制

    方式

    同步确认

    同步确认可以大致分为以下两种

  4. 单条消息的同步确认:代码简单,对吞吐量影响较大,但是可以确保每一条消息都发送成功,失败时便于异常定位

  5. 批量消息同步确认:代码简单,对吞吐量影响较小,失败时无法确定具体哪一条消息发送失败,需要将一批消息同时重发
    异步确认
    代码相对来说比较复杂,对吞吐量影响很小,并且可以对每一条消息进行成功或失败的回调操作。是应用最多的一种方式

    代码

    发布确认相关代码如下 ```java

//1.开启发布确认 channel.confirmSelect();

//2.同步发布确认 channel.waitForConfirms();

//3.异步发布确认 ConfirmCallback ackCallback = (long deliveryTag, boolean multiple) -> { // TODO do something }; ConfirmCallback nackCallback = (long deliveryTag, boolean multiple) -> { // TODO do something }; channel.addConfirmListener(ackCallback, nackCallback); ```

4.消息的持久化

在消息确认发布到Broker以后,就需要进行消息的持久化操作了。接下来对消息的持久化操作做一个详细的解读

交换机与队列的持久化

消息是基于队列存储的,而队列是需要与交换机绑定的。
因此,为了实现队列的持久化,RabbitMq中还需要确保持久化的消息,被持久化的交换机,发送到了持久化的队列中
交换机与队列需要将durable 设置为true,从而开启持久化功能
image.png

image.png

RabbitMq队列原理

队列架构

在了解了消息的种类与组成之后,接下来了解消息是如何持久化的。首先第一步,需要先了解下RabbitMq中队列的架构
image.png
如图,在RabbitMQ中,MessageQueue主要由两部分组成

  1. AMQPQueue:实现AMQP协议的逻辑功能,包括接收消息,投递消息,Confirm消息等
  2. BackingQueue:提供AMQPQueue调用的接口,完成消息的存储和持久化工作

    队列中消息的生命状态

    BackingQueue由Q1、Q2、Delta、Q3、Q4五个子队列构成
    在BackingQueue中,消息的生命周期有4个状态:
queue state message message index
q1、q4 alpha RAM RAM
q2、q3 beta DISK RAM
q2、q3 gamma DISK RAM&DISK
delta delta DISK DISK

状态说明:

  1. alpha:消息的内容和消息索引都在内存中
  2. beta:消息的内容保存在硬盘上,消息索引保存在内存中
  3. gamma:消息的内容保存在硬盘上,消息索引保存在硬盘和内存中
  4. delta:消息内容和索引都保存在硬盘上

    原理

  5. 消息首先会被写入到内存当中,当内存负载超过阈值后,消息会被写入到硬盘

  6. 而当内存负载降下来后,消息又会被重新加载到内存当中

由此可以看到,无论是瞬态消息还是持久化消息,本身都会经历RAM->DISK->RAM的过程。而引起状态变化的原因,大致分为两种

  1. 内存不足:内存不足时,消息会被持久化到硬盘,以便释放内存空间
  2. 消息被消费:当消息被消费且确认消费成功(ack)后,消息从内存中删除,内存得到释放,未被消费或未被确认消费成功(ack)的消息会被重新加载到内存

    消息的持久化时机

    RabbitMq中消息的文件是要持久化到硬盘的,众所周知,所以基于硬盘存储的中间件其数据本身,为了提高并发效率,都会优先持久化到内存,后续再持久化到硬盘,RabbitMq同样如此。
    RabbitMq中消息的持久化时机大致为如下几种

  3. 待写入磁盘缓冲区数据量>=1M时,消息数据会被刷新到磁盘

  4. 距离上次刷盘时间>25ms时,消息数据会被刷新到磁盘

    RabbitMq的流控-信用机制

    目的

    上文中了解到,消息的持久化,会经历RAM->DISK->RAM的过程。而这个过程本身,对于RabbitMq的吞吐量是会造成一定影响的。毕竟内存的读写效率远远高于硬盘的读写效率。
    同时为了避免Publisher与Consumer的消息处理效率不对等,造成大量消息积压从而引发内存溢出,导致Broker挂掉。RabbitMq设置了一套流控-信用机制来预防这个问题发生

    机制

    当RabbitMQ出现内存或者磁盘资源达到阈值时,会触发流控机制。
    阻塞Publisher的Connection,让Publisher不能继续发送消息,直到消息被持续消费,确保内存使用率下降到阈值以下,才会允许Publisher继续发送消息。从而避免了过量的消息积压导致的内存溢出问题

    5.消息的消费

    消息的获取模式

    RabbitMq中支持push和pull两种消息获取模式,其中默认为push模式。即Broker主动将消息推送给Consumer

    消息的应答机制

    在消息push给Consumer后,Consumer开始消费这条消息,那么,Broker如何获取消息的消费结果,以便后续的处理呢?

    自动应答

    默认的应答模式即消息推送给Consumer后,即认为消息消费成功,消息的状态会从Ready直接变为Ack。且Broker会删除该消息
    自动应答模式下,如果Consumer在消息未消费完成时,出现宕机或重启的情况,会造成消息的丢失
    因此,自动应答适用于处理数据安全性不高,但对吞吐量要求高的场景

    手动应答

    手动应答分为3种

  5. channel.basicAck(long deliveryTag, boolean multiple):确认应答,代表消息消费成功。应答后消息的状态从Unacked变为Ack,Broker会删除该消息

  6. channel.basicNack(long deliveryTag, boolean multiple, boolean requeue):否定应答,代表消息消费失败,消息的状态从Unacked变为Ready,Broker会保留该消息,等待消息被再次消费
  7. channel.basicReject(long deliveryTag, boolean requeue():否定应答,代表Consumer拒绝该消息,Broker会删除该消息

上述方法中的相关参数介绍如下:

  1. deliveryTag:消息的标志
  2. multiple:是否批量应答
  3. requeue:是否重新进入队列,true的情况下消息会重新入队,false的情况下消息不会重新入队。如果配置了死信队列,消息会进入到死信队列中

    预取值

    概念

    经过上文可以了解到,消息从推送给消费者,到确认消费完成,实际上是经历了3个状态的变更
    Ready -> Unacked -> Ack
    而消息的推送与消息的确认,本质上都是一个异步的过程。因此就需要一个缓冲区,来存放那些状态为Unacked的消息。
    而这个缓冲区允许存放Unacked消息的最大数量,就是所谓的预取值,即channel.basicQos()

    作用

    预取值机制是为了保护Consumer,当Consumer的消费效率跟不上Publisher的生产效率,如果没有一个中间的缓存区,那么就会造成消息在Consumer大量堆积,最终造成Consumer程序宕机或崩溃
    因此,通过设置预取值的大小,来设置Unacked消息缓冲区的大小。当Unacked消息的数量达到缓冲区允许存放Unacked消息的最大数量时,Broker就不会在向Consumer推送消息。
    直到至少一个消息被ack确认,消息才会重新推送给Consumer。阻塞时积压的消息会由Broker暂存。同时Broker会通过流控-信用机制来保护自身

    不公平分发

    当一个队列存在多个Consumer时,Broker默认通过轮询的方式将消息推送给Consumer,从而实现Consumer端的负载均衡。
    而轮询方式在多个Consumer消费效率不同时,会导致存在部分Consumer空闲,而部分Consumer过于忙碌的情况。
    为了进一步保护消费效率低的Consumer,可以将那部分Consumer的预取值设置为1,确保Consumer消费完1个消息并ack确认后,才会重新消费另一条消息。

    6.消息的删除

    RabbitMq中,Consumer消费完消息并回复ack响应后,Broker同时也会将消息删除
    RabbitMq中的删除是逻辑删除,实际上只是删除了ETS(Erlang Term Storge)表中的记录,而具体的消息文件会被后台进程在后续的时间进行合并、删除
    消息的物理删除会在以下情况下发生

  4. 所有消息文件中的被删除消息比率>=50%且消息文件>=3个时,会触发一个消息文件的合并操作。类似于ElasticSearch中的Segment合并。删除时通常会锁定两个相邻文件,对前面文件的数据做有效整理,同时将后面文件的有效数据写入前面的文件,最后删除后面的文件。然后会更新ETS(Erlang Term Storge)表中的记录

    7.参考

  5. RabbitMQ超详细学习笔记

  6. RabbitMq读写流程