4.1 消息何去何从

mandatory和immediate是channel.basicPublish方法中的两个参数,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。另一方面,RabbitMQ提供了备份交换器(Alternative Exchange),可以用来将未能被交换器路由的消息(没有绑定队列或者没有匹配的绑定)存储起来,而不用返回给客户端。

4.1.1 mandatory参数

当mandatory参数设为true时,若交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当mandatory参数设置为false时,出现上述情形,则消息被直接丢弃。
那么生产者如何获取没有被正确路由到合适队列的消息呢?这时候可以通过调用channel.addReturnListener来添加ReturnListener监听器实现。

4.1.2 immediate参数

当immediate参数设为true时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过Basic.Return返回至生产者。
mandatory和immediate的区别

  • mandatory是如果没有找到合适的队列,则退回消息;
  • immediate是如果找到了队列,但是队列没有消费者,则退回消息。

RabbitMQ 3.0版本开始去掉了对immediate参数的支持。现在推荐通过将消息过期时间(TTL)设置为0并配合死信交换器(DLX)来处理队列没有消费者的情况。

4.1.3 备份交换器

备份交换器(Alternate Exchange),更直白的称呼是“备胎交换器”(缺省交换器)。生产者在发送消息的时候,如果不设置mandator参数(默认为false),那么消息在未被路由的情况下将会丢失。如果将mandatory设置为true,那么消息在未被路由的情况下将会被返回给生产者,但这意味着生产者必须条件ReturnListener的编程逻辑,进而让生产者代码复杂化。如果既不想复杂化生产者逻辑,又不想消息丢失,那么可以使用备份交换器,这样消息在未被路由的情况下将会被发送到备份路由器,然后发送到与其绑定的队列中。注意,消息被重新发送给备份交换器时的路由键和从生产者发出时的路由键是一样的。如果备份交换器中也找不到合适的队列,则消息还是会丢失。
在声明交换器时(调用channel.exchangeDeclare方法)添加alternate-exchange参数来将交换器声明为备份路由器,也可以通过策略(参考6.3节)的方式实现。


4.2 过期时间(TTL)

RabbitMQ可以对消息和队列设置TTL。


4.3 死信队列

DLX,全程为Deal-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到一个被称之为死信交换器的交换器中,绑定到死信交换器的队列被称之为死信队列。
消息变成死信一般是由于以下几种情况:

  • 消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false;
  • 消息过期;
  • 队列达到最大长度。

    注意,成为死信的前提,是能找到一个队列。如果一个消息根本找不到队列,则会被发送到备份交换器,而非死信交换器。

DLX也是一个正常的交换器,和一般的交换器没有什么区别。
在声明队列时,通过x-dead-letter-exchange参数来为一个队列添加DLX。在发送到死信交换器时,默认的路由键就是原信息的路由键,不过可以使用x-dead-letter-routing-key参数来专门指定一个死信路由键。
image.png


4.4 延迟队列

延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
延迟队列的使用场景有很多,比如:

  • 在订单系统中,一个用户下单之后通常由30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行异常处理(比如取消订单),这时就可以使用延迟队列来处理这些订单,避免不断遍历订单表来判断一个订单是否过期。
  • 用户希望通过手机远程遥控家里的只能设备在指定的时间进行工作。这时候就可以将用户指定发送到延迟队列,当指令设定的时间到了再将指令推送到智能设备。

在AMQP协议中,或者RabbitMQ本身没有直接支持延迟队列的功能,但是可以通过前面介绍的TTL和DLX模拟出延迟队列的功能
上图4-4不仅展示了死信队列的用法,同时也是延迟队列的实现原理。假设一个应用中需要将每条消息都设置为10秒的延迟,那么可以创建一个队列queue.normal,将其TTL设置为10S。然后,创建一个DLX,绑定一个队列queue.dlx。将该DLX指定为queue.normald的DLX。另一方面,消费者订阅queue.dlx这个队列,而非queue.normal。整体流程为:生产者发送消息到queue.normal中,由于没有订阅者,该消息被缓存。10秒之后该消息过期,被发送到queue.dlx中,消费者订阅了该队列,因此开始消费。


4.5 优先级队列

优先级高的消息具备优先被消费的特权。
可以通过设置队列的x-max-priority参数来设置一个队列的最大优先级。在此之后,需要在发送时在消息中 设置消息当前的优先级。


4.6 RPC实现


4.7 持久化

持久化可以提高RabbitMQ的可靠性,以防在异常情况(重启、关闭、宕机等)下的数据丢失。RabbitMQ的持久化分为三个部分:

  • 交换器持久化(交换器重启后不丢失);
  • 队列持久化(队列重启后不丢失);
  • 消息持久化(队列里的消息重启后不丢失)。

将所有消息都设置为持久化,将严重影响RabbitMQ的性能。写入磁盘的速度比写入内存慢得多。选择消息持久化时,要在可靠性和吞吐量之间做一个权衡。不那么重要的消息就不必持久化。
注意,即便设置了这三者持久化,还是有可能出现消息丢失。一方面写磁盘本身可能只是写到缓存,没有刷盘;另一方消费者可能使用了autoAck,消息发送后服务端就ACK了,但消息处理时消费者挂了。


4.8 生产者确认

持久化可以保证消息到达服务端后不丢失。但是,当消息的生产者将消息发送出去之后,消息到底有没有正确到达服务端呢?如果不进行特殊配置,默认情况下发送消息的操作是不会返回任何消息给生产者的,也就是默认情况下生产者是不知道消息有没有正确到达服务器的。
RabbitMQ针对这个问题,提供了两种解决方式:

  • 事务机制(同步确认);
  • 发送方确认机制(异步确认)。

    事务机制

    RabbitMQ客户端中与事务机制相关的方法有三个:

  • channel.txSelect:用于将当前信道设置成事务模式(开启事务)。

  • channel.txCommit:提交事务。
  • channel.txRollback:回滚事务。

我们先用channel.txSelect开启事务,然后正常发布消息,接着调用channel.txCommit提交事务。如果事务提交成功,则消息一定到达了RabbitMQ。

发送方确认机制

生产者将信道设置成confirm模式。一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID。一旦消息被投递到所有匹配的队列后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经正确到达了目的地了。如果消息和队列时可持久化的,那么确认消息会在消息写入磁盘之后发出。
事务机制在一条消息发送之后会使发送端阻塞,以等待 RabbitMQ 的回应,之后才能继续 发送下一条消息。相比之下,发送方确认机制最大的好处在于它是异步的,一旦发布一条消息, 生产者应用程序就可以在等信道返回确认的同时继续发送下条消息,当消息最终得到确认之 后,生产者应用程序便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条nack (Basic.Nack )命令,生产者应用程序同样可以在回调方法中处理该nack命令。


4.9 消费端要点介绍

4.9.1 消息分发

当RabbitMQ队列拥有多个消费者时,队列收到的消息将以轮询( round-robin )的分发方式 发送给消费者。每条消息只会发送给订阅列表里的一个消费者。这种方式非常适合扩展,而且 它是专门为并发程序设计的。如果现在负载加重,那么只需要创建更多的消费者来消费处理消 息即可。
很多时候轮询的分发机制也不是那么优雅。因为RabbitMQ不会考虑消费者处理消息的速度。如果某些消费者任务繁重,来不及消费那么多的消 息,而某些其他消费者由于某些原因(比如业务逻辑简单、机器性能卓越等)很快地处理完了 所分配到的消息,进而进程空闲,这样就会造成整体应用吞吐量的下降。
那么该如何处理这种情况呢?这里就要用到 channel.basicQos(int prefetchCount) 这个方法,如前面章节所述, channel.basicQos 方法允许限制信道上的消费者所能保持的最大未确认消息的数量。这种机制可以类比于 TCP/IP 中的“滑动窗口”(更准确的说,流量控制)。

4.9.2 消息顺序性

消息的顺序性是指消费者消费到的消息和发送者发布的消息的顺序是一致的。
目前很多资料显示 RabbitMQ 的消息能够保障顺序性,这是不正确的,或者说这个观点有很大的局限性。在不使用任何RabbitMQ高级特性,也没有消息丢失、网络故障之类异常的情况发生,并且只有一个消费者的情况下,最好也只有一个生产者的情况下可以保证消息的顺序性。如果有多个生产者同时发送消息,无法确定消息到达Broker 的前后顺序,也就无法验证消息的顺序性。

4.9.3 弃用QueueingConsumer


4.10 消息传输保障

一般消息中间件的消息 传输保障分为三个层级:

  • At most once :最多一次。消息可能会丢失,但绝不会重复传输。
  • At least once :最少一次。消息绝不会丢失,但可能会重复传输。
  • Exactly once :恰好一次。每条消息肯定会被传输一次且仅传输一次。

RabbitMQ支持其中的“最多一次”和“最少一次”。
其中“最少一次”投递实现需要考虑以下这个几个方面的内容:

  1. 消息生产者需要使用事务机制或发送方确认机制,来确保信息已经发送到了RabbitMQ服务端。
  2. 消息生产者需要将mandatory参数设为true,并设置备份交换器,来确保消息不会由于找不到合适的队列而被丢弃。
  3. 消息和队列都要进行持久化处理,以确保不会在服务端宕机时丢失消息。
  4. 消费者在消费消息时,需要将AutoAck设置为false,然后通过抖动确认的方式去确认已经正确地消费了消息。

“至多一次”可能导致消费端多次消费同一条消息,因此需要由消费端来保证消息处理的幂等性。