1.队列的基础类型

经典队列(classic)

默认模式,经典的队列模式,支持镜像队列模式,支持消息类型的选择,支持多种队列属性

仲裁队列(quorom)

RabbitMQ 3.8新增的队列属性,特点大致包括如下几点

  1. 默认存在5个副本,1个Master副本,4个Follow副本
  2. 在消息在超过半数的副本持久化成功后,才会给Publisher发布确认响应,主副本的选举同样也需要超过半数节点的投票才会当选
  3. 副本间数据异步同步,而不是镜像队列的同步模式,非阻塞所以吞吐量更高
  4. 消息数据全部持久化到内存当中

其他请参考:https://bbs.huaweicloud.com/blogs/364672

流式队列(stream)

RabbitMQ 3.9新增的队列属性,对标的是Kafka的流式数据处理
参考:https://www.jdon.com/56866

2.死信队列

什么是死信、死信队列

Rabbit中,常规的消息流程是

  1. Publisher发送消息到Broker
  2. Broker将消息传递给对应的VirtualHost
  3. VirtualHost会继续解析消息,将消息发送给对应的Exchange
  4. Exchange根据路由规则将消息转发到对应的队列中
  5. Queue会将消息推送给Consumer进行消费,Consumer消费完成后回复ack消费确认
  6. Queue将消息从队列中删除

然而存在一些异常情况,导致消息在Queue持久化后,无法被Consumer消费。这一类型的消息,称之为死信。存放死信消息的队列,称为死信队列

死信队列的实现原理以及前置条件

实现原理

实现原理如下图,将死信交换机通过死信bindind_key绑定在常规队列上。
当触发一些异常条件时,消息会被发送给死信交换机,并且将死信交换机与常规队列绑定的binding_key作为消息routing_key。死信交换机按照其路由匹配规则,将死信消息发送到死信队列
可以设置相关的死信队列消费者专门进行死信队列的消费,也可以不设置。并且死信交换机可以为任意种类的交换机
image.png

前置条件

这里以直连交换机作为死信交换机为例

  1. 创建死信交换机、死信队列
  2. 将死信队列绑定到死信交换机上,并设置绑定key
  3. 创建常规交换机、常规队列
  4. 设置常规队列的x-dead-letter-exchange属性为死信交换机,x-dead-letter-routing-key属性为死信队列与死信交换机绑定key
  5. 将常规队列绑定到常规交换机上

    死信消息产生的条件

    以下所有条件的都需依赖上文的前置条件

    未在规定时间消费

    如果消息没有在规定时间内被消费,那么未被消费的消息将会进入到死信队列
    需要设置队列ttl属性(队列中消息的过期时间,单位ms)

    消息确认异常

    如果消息消费确认异常或被拒绝,且requeue参数为false(代表消息不回重新入队),那么被拒绝的消息将会进入到死信队列

    队列已满

    队列已满,无法继续接收消息。如果后续还有消息发送,队列前面的消息将会进入到死信队列(这里需要注意的是,是队列前面的消息进入,而不是后来的消息进入)
    需要设置队列maxLength属性(队列允许存储消息数量的最大值)

    应用场景

    延迟队列

    可以利用队列的ttl属性与死信队列结合,将死信队列改造为延迟队列。具体做法如下

  6. 创建常规队列与常规交换机,两者绑定

  7. 创建死信队列与死信交换机,两者绑定
  8. 将死信交换机与常规队列绑定
  9. 设置常规队列过期时间
  10. 常规队列不设置消费者,死信队列设置消费者

这样,消息进入常规队列后,因为没有消费者消费。在超过ttl时间后,消息会进入死信队列,然后由死信队列的消费者进行消费。这样死信队列与常规队列结合,就实现了延迟队列的功能

订单超时取消

商城类的项目,超时订单的处理是一个不得不面对的问题。rabbitMq中的死信队列就是其中的方案之一。具体如下如下

  1. 与延迟队列一样,先将常规队列与延迟队列结合,改造为一个延迟队列
  2. 用户下单后,发送一条订单消息到常规队列
  3. 如果用户正常支付,则变更订单状态为已支付
  4. 规定的消息超时时间(实际上就是订单超时时间)后,消息被发送到死信队列
  5. 获取消息的订单信息查询订单状态
  6. 如果订单状态为已支付,则该订单为正常订单,流程结束即可
  7. 如果订单状态为未支付,代表规定时间内,该订单未完成支付。可以视为订单已失效,变更订单状态为已失效,流程结束

    3.延迟队列

    什么是延迟队列

    延迟队列,顾名思义,即消息可以在指定的时间后推送给消费者的队列。RabbitMq中并没有专门的延迟队列,延迟队列的实现大致需要三种方式

    实现条件

    死信队列+队列ttl

    实现方法

    具体方法参考上文即可,设置了常规队列的x-message-ttl属性后,即可实现消息的延迟消费。本质上是消息从一个队列被重发到了死信队列

    死信队列+消息ttl

    设置队列ttl的形式,在面对需要将粒度细化到消息级别的需求时,便有些捉襟见肘了。毕竟不能存在一个时间的消息,就创建一个队列。此时就需要用到消息级别的ttl
    在RabbitMQ 3.0.0以后的版本中,TTL 设置可以具体到每一条 message 本身。

    实现方法

    前置条件与死信队列+队列ttl的一致,区别就是在发送消息时,不是设置队列的ttl属性,而是设置消息的expiration(过期时间属性,单位ms)属性,即可给消息设置一个过期时间。这样消息在规定时间没有被消费后,即可被发送到死信队列

    注意点
  8. 如果同时设置了队列和消息的ttl,那么两者之中数值小的会生效

  9. 这种方式有一个弊端,当发送的第一条消息过期时间为10s,而第二条消息过期为2s时,实际上第一条消息会在10s后被消费,第二条消息也会在10s后被消费!!!!根本原因是这种实现方法的原理还是依赖于队列这种结构,而队列的本质就是先入先出,即使设置了超时时间,也不能违背这个原则

    延迟队列插件

    RabbitMq中同样可以使用延迟队列插件,来实现。插件的安装方法参考:RabbitMq部署

    实现方法
  10. 如下图,在创建交换机时,指定交换机种类为延迟队列交换机,并设置交换机类型(x-delayed-type)

image.png

  1. 创建队列绑定在延迟交换机上
  2. 发送消息时,指定消息delay属性(单位ms)即可

    注意点

    这种方式不会出现死信队列+消息ttl那种先入先出的问题。如果发送第一条消息的过期时间为10s,第二条
    消息的过期时间为2s,那么第二条消息会优先推送给消费者

    应用场景

  3. 未支付订单的取消

  4. 定时通知
  5. 定时删除

凡是涉及到定时处理的场景,理论上都可以考虑使用RabbitMq中的延迟队列实现

4.优先级队列

什么是优先级队列

理论上来说队列中的消息,要遵循先入先出的场景 。但是有时候,在一些场景中,需要对一些消息进行加急处理。
RabbitMq中提供了这样一种队列,可以设置消息的优先级,优先级高的消息,即使晚入队,仍然可以优先推送个消费者,这种队列成为优先级队列

实现条件

  1. 设置队列的优先级属性(x-max-priority):取值范围为0~255,设置的范围越大,对内存的消耗越高。
  2. 设置发送消息的优先级属性(priority):如果不设置消息的优先级属性,优先级会失效。不设置默认为0
  3. 消息积压:如果Consumer的消费效率极高,不会造成消息积压,那么即使满足了上述两条,仍然无法实现优先性。因为每一条消息都会及时推送给Consumer的话,就不存在优先级这么一说了。优先级存在的前提条件就是多个,就一个还挑什么挑?

    应用场景

  4. 会员制模式下的订单处理,会员优先处理

  5. 加急消息的处理,可以给加急的消息设置高优先级,这样加急的消息就会优先处理

    5.惰性队列

    什么是惰性队列

    通过RabbitMq消息的发布与消费原理可以了解到,RabbitMq队列中持久化的消息会优先存储在内存当中,即使是持久化消息,也会在内存中备份一份。因为毕竟内存的读写效率是要明显由于硬盘的。
    而一旦内存不足,内存中的数据就会持久化到硬盘以释放内存,这一步是很耗费性能的,也是很影响RabbitMq吞吐量的
    然而,有这样一些场景,比如消息量极大,每天多达百万条甚至千万条,或者Publisher会瞬时性的发布巨量的消息(例如运维告警的场景),而Consumer的消费效率跟不上Producer的生产效率,并且对吞吐量没有极高的要求。那么这种时候,将消息保存在内存,是否还是一个很好的选择?
    很明显不是的,在上述的场景下,将消息存在硬盘似乎是一种更好的选择,因为硬盘是没有存储限制的,无论体量多大的消息,只要硬盘支持,都能够完成持久化。并且在对吞吐量没有极高的要求下,将数据从硬盘读取到内存,也是可以接受的
    这个时候,就需要一种队列,能够将消息直接持久化到硬盘,这种队列,称为惰性队列

    实现方法

    惰性队列实现方法比较简单,设置队列x-queue-mode属性即可。
    x-queue-mode包含两个取值

  6. default:默认值,默认模式,消息会优先在内存中存储

  7. lazy:惰性模式,消息会直接加载到硬盘,后续推送给Consumer消费的时候,会被重新加载到内存当中并推送给Consumer

如下图,创建消息时,指定x-queue-mode即可
image.png

应用场景

  1. 消息量级极大:每天达到百万甚至千万级别,如果全部优先在内存中加载,很明显是对内存资源很大的消耗
  2. 内存空间紧张:没有足够的内存空间存储巨量的消息,需要消息优先在硬盘上加载,而不是内存
  3. 吞吐量要求不高:对吞吐量没有很高的要求,可以接收消息数据从硬盘加载到内存带来的性能消耗