上一小节松哥和大家聊了 MQ 高可用之如何确保消息成功发送,各种配置齐上阵,最终确保了消息的成功发送,甚至在一些极端情况下还可能发生同一条消息重复发送的情况,不管怎么样,消息总算发送出去了,如果小伙伴们还没看过上篇文章,建议先看看,再来学习本文:

今天我们就来聊一聊消息消费的问题,看看如何确保消息消费成功,并且确保幂等性。

1. 两种消费思路

RabbitMQ 的消息消费,整体上来说有两种不同的思路:

  • 推(push):MQ 主动将消息推送给消费者,这种方式需要消费者设置一个缓冲区去缓存消息,对于消费者而言,内存中总是有一堆需要处理的消息,所以这种方式的效率比较高,这也是目前大多数应用采用的消费方式。
  • 拉(pull):消费者主动从 MQ 拉取消息,这种方式效率并不是很高,不过有的时候如果服务端需要批量拉取消息,倒是可以采用这种方式。

两种方式我都举个例子看下。

先来看推(push):

这种方式大家比较常见,就是通过 @RabbitListener 注解去标记消费者,如下:

  1. @Component
  2. public class ConsumerDemo {
  3. @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
  4. public void handle(String msg) {
  5. System.out.println("msg = " + msg);
  6. }
  7. }

当监听的队列中有消息时,就会触发该方法。

再来看拉(pull):

  1. @Test
  2. public void test01() throws UnsupportedEncodingException {
  3. Object o = rabbitTemplate.receiveAndConvert(RabbitConfig.JAVABOY_QUEUE_NAME);
  4. System.out.println("o = " + new String(((byte[]) o),"UTF-8"));
  5. }

调用 receiveAndConvert 方法,方法参数为队列名称,方法执行完成后,会从 MQ 上拉取一条消息下来,如果该方法返回值为 null,表示该队列上没有消息了。receiveAndConvert 方法有一个重载方法,可以在重载方法中传入一个等待超时时间,例如 3 秒。此时,假设队列中没有消息了,则 receiveAndConvert 方法会阻塞 3 秒,3 秒内如果队列中有了新消息就返回,3 秒后如果队列中还是没有新消息,就返回 null,这个等待超时时间要是不设置的话,默认为 0。

这是消息两种不同的消费模式。

如果需要从消息队列中持续获得消息,就可以使用推模式;如果只是单纯的消费一条消息,则使用拉模式即可。切忌将拉模式放到一个死循环中,变相的订阅消息,这会严重影响 RabbitMQ 的性能。

2. 确保消费成功两种思路

上篇文章中,我们想尽办法确保消息能够发送成功,对于消息消费成功,其实官方提供了相关的机制,我们一起来看下。

为了保证消息能够可靠的到达消息消费者,RabbitMQ 中提供了消息消费确认机制。当消费者去消费消息的时候,可以通过指定 autoAck 参数来表示消息消费的确认方式。

  1. #指定消息的消费者自动ack,此时只要RabbitMQ把这条消息发送出去,就会把这条消息从内存中移除掉,即使这条消息并没有成功地到达消费者,这条消息也会被移除。
  2. spring.rabbitmq.listener.simple.acknowledge-mode=auto
  3. #指定消息的消费者手动ack,此时消息消费者收到消息后,RabbitMQ不会立即把这个消息移除,而是会等待消费者显示地回复一个确认的信号之后,才会把消息打上一个删除的标记,然后再删除。
  4. spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • 当 autoAck 为 false 的时候,此时即使消费者已经收到消息了,RabbitMQ 也不会立马将消息移除,而是等待消费者显式的回复确认信号后,才会将消息打上删除标记,然后再删除。
  • 当 autoAck 为 true 的时候,此时消息消费者就会自动把发送出去的消息设置为确认,然后将消息移除(从内存或者磁盘中),即使这些消息并没有到达消费者

我们来看一张图:

09-RabbitMQ消费可靠性 - 图1

如上图所示,在 RabbitMQ 的 web 管理页面:

  • Ready 表示待消费的消息数量。
  • Unacked 表示已经发送给消费者但是还没收到消费者 ack 的消息数量。
  • Ready + Unacked = Total

这是我们可以从 UI 层面观察消息的消费情况确认情况。

当我们将 autoAck 设置为 false 的时候,对于 RabbitMQ 而言,消费分成了两个部分:

  • 待消费的消息
  • 已经投递给消费者,但是还没有被消费者确认的消息

换句话说,当设置 autoAck 为 false 的时候,消费者就变得非常从容了,它将有足够的时间去处理这条消息,当消息正常处理完成后,再手动 ack,此时 RabbitMQ 才会认为这条消息消费成功了。如果 RabbitMQ 一直没有收到客户端的反馈,并且此时客户端也已经断开连接了,那么 RabbitMQ 就会将刚刚的消息重新放回队列中,等待下一次被消费。

综上所述,确保消息被成功消费,无非就是手动 Ack 或者自动 Ack,无他。当然,无论这两种中的哪一种,最终都有可能导致消息被重复消费,所以一般来说我们还需要在处理消息时,解决幂等性问题。

3. 消息拒绝

当客户端收到消息时,可以选择消费这条消息,也可以选择拒绝这条消息。我们来看下拒绝的方式:

  1. @Component
  2. public class ConsumerDemo {
  3. @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
  4. public void handle(Channel channel, Message message) {
  5. //获取消息编号
  6. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  7. try {
  8. //拒绝消息
  9. channel.basicReject(deliveryTag, true);
  10. } catch (IOException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. }

消费者收到消息之后,可以选择拒绝消费该条消息,拒绝的步骤分两步:

  1. 获取消息编号 deliveryTag。
  2. 调用 basicReject 方法拒绝消息。

调用 basicReject 方法时,第二个参数是 requeue,即是否重新入队。如果第二个参数为 true,则这条被拒绝的消息会重新进入到消息队列中,等待下一次被消费;如果第二个参数为 false,则这条被拒绝的消息就会被丢掉,不会有新的消费者去消费它了。

需要注意的是,basicReject 方法一次只能拒绝一条消息。

4. 消息确认

消息确认分为自动确认和手动确认,我们分别来看。

4.1. 自动确认

先来看看自动确认,在 Spring Boot 中,默认情况下,消息消费就是自动确认的。

我们来看如下一个消息消费方法:

  1. @Component
  2. public class ConsumerDemo {
  3. @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
  4. public void handle2(String msg) {
  5. System.out.println("msg = " + msg);
  6. int i = 1 / 0;
  7. }
  8. }

通过 @Componet 注解将当前类注入到 Spring 容器中,然后通过 @RabbitListener 注解来标记一个消息消费方法,默认情况下,消息消费方法自带事务,即如果该方法在执行过程中抛出异常,那么被消费的消息会重新回到队列中等待下一次被消费,如果该方法正常执行完没有抛出异常,则这条消息就算是被消费了。

代码演示:

首先创建一个Empty Project,工程名叫做mq_receive_message

09-RabbitMQ消费可靠性 - 图2

09-RabbitMQ消费可靠性 - 图3

在mq_receive_message工程下通过Spring Initializr的方式创建一个SpringBoot应用的模块,模块名叫做auto_ack

09-RabbitMQ消费可靠性 - 图4

选择依赖

09-RabbitMQ消费可靠性 - 图5

在auto_ack模块的application.properties配置文件中做如下配置:

  1. spring.rabbitmq.host=localhost
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=guest
  4. spring.rabbitmq.password=guest
  5. spring.rabbitmq.virtual-host=/

在auto_ack模块的org.javaboy.auto_ack.config包下面创建RabbitMQ的配置类

  1. @Configuration
  2. public class RabbitConfig {
  3. public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name";
  4. public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name";
  5. @Bean
  6. Binding msgBinding() {
  7. return BindingBuilder.bind(msgQueue())
  8. .to(directExchange())
  9. .with(JAVABOY_QUEUE_NAME);
  10. }
  11. @Bean
  12. DirectExchange directExchange() {
  13. return new DirectExchange(JAVABOY_EXCHANGE_NAME, true, false);
  14. }
  15. @Bean
  16. Queue msgQueue() {
  17. return new Queue(JAVABOY_QUEUE_NAME, true, false, false);
  18. }
  19. }

在auto_ack模块的org.javaboy.auto_ack.receiver包下面创建一个消息的消费者

  1. @Configuration
  2. public class MsgReceiver {
  3. private static final Logger logger = LoggerFactory.getLogger(MsgReceiver.class);
  4. @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
  5. public void handleMsg(String msg) {
  6. logger.info("msg:{}", msg);
  7. int i = 1 / 0;
  8. }
  9. }

在auto_ack模块的org.javaboy.auto_ack.controller包下面创建一个控制器

  1. @RestController
  2. public class HelloController {
  3. @Autowired
  4. RabbitTemplate rabbitTemplate;
  5. @GetMapping("/send")
  6. public void hello() {
  7. rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,RabbitConfig.JAVABOY_QUEUE_NAME,"hello 江南一点雨");
  8. }
  9. }

启动SpringBoot应用,然后在浏览器访问http://localhost:8080/send,查看RabbitMQ的web管理页面的Queues选项卡,Unacked的值是1,并且idea的控制台一直在不停的重试。原因是因为现在设置了自动ack,消费者消费成功之后会自动的给RabbitMQ发送一条消息告诉RabbitMQ这条消息消费成功了,但是这个消费的方法里面现在抛异常了,它就会去告诉RabbitMQ,这条消息消费失败,消费失败之后RabbitMQ就会把刚才那条消息放回到队列里面去,放回到队列里面又会立马给你推送过来,推送过来有消费失败了,所以你就会看到现在这种局面“不停的消费,不停的失败”。

4.2. 手动确认

手动确认我又把它分为两种:推模式手动确认与拉模式手动确认。

首先在mq_receive_message工程下通过Spring Initializr的方式创建一个SpringBoot应用的模块,模块名叫做manual

09-RabbitMQ消费可靠性 - 图6

选择依赖

09-RabbitMQ消费可靠性 - 图7

在manual模块的application.properties配置文件中做如下配置:

  1. spring.rabbitmq.host=localhost
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=guest
  4. spring.rabbitmq.password=guest
  5. spring.rabbitmq.virtual-host=/
  6. #设置消息消费者的手动确认
  7. spring.rabbitmq.listener.simple.acknowledge-mode=manual

在manual模块的org.javaboy.manual.config包下面创建RabbitMQ的配置类

  1. @Configuration
  2. public class RabbitConfig {
  3. public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name";
  4. public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name";
  5. @Bean
  6. Binding msgBinding() {
  7. return BindingBuilder.bind(msgQueue())
  8. .to(directExchange())
  9. .with(JAVABOY_QUEUE_NAME);
  10. }
  11. @Bean
  12. DirectExchange directExchange() {
  13. return new DirectExchange(JAVABOY_EXCHANGE_NAME, true, false);
  14. }
  15. @Bean
  16. Queue msgQueue() {
  17. return new Queue(JAVABOY_QUEUE_NAME, true, false, false);
  18. }
  19. }

4.2.1. 推模式手动确认

在manual模块的org.javaboy.manual.receiver包下面创建一个消息的消费者

  1. @Configuration
  2. public class MsgReceiver {
  3. private static final Logger logger = LoggerFactory.getLogger(MsgReceiver.class);
  4. @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
  5. public void handleMsg(Message message, Channel channel) {
  6. //获取消息的一个标记
  7. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  8. try {
  9. //开始消息的消费
  10. byte[] body = message.getBody();
  11. String s = new String(body);
  12. logger.info("handleMsg,{}", s);
  13. int i = 1 / 0;
  14. //消费完成后,手动ack
  15. //第一个参数是消息的标记,第二个参数如果为 false,表示仅仅确认当前消息,如果为 true,表示之前所有的消息都确认消费成功
  16. channel.basicAck(deliveryTag,false);
  17. } catch (Exception e) {
  18. //手动 nack,告诉 mq 这条消息消费失败
  19. try {
  20. channel.basicNack(deliveryTag,false,true);
  21. } catch (IOException ex) {
  22. ex.printStackTrace();
  23. }
  24. }
  25. }
  26. }

将消费者要做的事情放到一个 try..catch 代码块中。

如果消息正常消费成功,则执行 basicAck 完成确认。

如果消息消费失败,则执行 basicNack 方法,告诉 RabbitMQ 消息消费失败。

这里涉及到两个方法:

  • basicAck:这个是手动确认消息已经成功消费,该方法有两个参数:第一个参数表示消息的 id;第二个参数 multiple 如果为 false,表示仅确认当前消息消费成功,如果为 true,则表示当前消息之前所有未被当前消费者确认的消息都消费成功。
  • basicNack:这个是告诉 RabbitMQ 当前消息未被成功消费,该方法有三个参数:第一个参数表示消息的 id;第二个参数 multiple 如果为 false,表示仅拒绝当前消息的消费,如果为 true,则表示拒绝当前消息之前所有未被当前消费者确认的消息;第三个参数 requeue 含义和前面所说的一样,被拒绝的消息是否重新入队。

当 basicNack 中最后一个参数设置为 false 的时候,还涉及到一个死信队列的问题,这个松哥以后再专门写文章和大家细聊。

在manual模块的org.javaboy.manual.controller包下面创建控制器

  1. @RestController
  2. public class HelloController {
  3. @Autowired
  4. RabbitTemplate rabbitTemplate;
  5. @GetMapping("/send")
  6. public void hello() {
  7. rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,RabbitConfig.JAVABOY_QUEUE_NAME,"hello 江南一点雨");
  8. }
  9. }

启动SpringBoot应用,浏览器访问http://localhost:8080/send,会发现控制台一直不停的在消费消息,原因是因为消费的时候出现异常,然后执行了Nack,Nack让这条消息重新回到队列里面。回到队列里面又立即推动过来了,然后又出异常,一直在死循环。

4.2.2. 拉模式手动确认

拉模式手动 ack 比较麻烦一些,在 Spring 中封装的 RabbitTemplate 中并未找到对应的方法,所以我们得用原生的办法,如下:

  1. @SpringBootTest
  2. class ManualApplicationTests {
  3. @Autowired
  4. RabbitTemplate rabbitTemplate;
  5. @Test
  6. void contextLoads() {
  7. long deliveryTag = 0;
  8. Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);
  9. try {
  10. GetResponse getResponse = channel.basicGet(RabbitConfig.JAVABOY_QUEUE_NAME, false);
  11. deliveryTag = getResponse.getEnvelope().getDeliveryTag();
  12. String s = new String(getResponse.getBody());
  13. System.out.println("s = " + s);
  14. int i = 1 / 0;
  15. channel.basicAck(deliveryTag, false);
  16. } catch (IOException e) {
  17. try {
  18. channel.basicNack(deliveryTag, false, true);
  19. } catch (IOException ex) {
  20. ex.printStackTrace();
  21. }
  22. }
  23. }
  24. }

这里涉及到的 basicAck 和 basicNack 方法跟前面的一样,我就不再赘述。

记得把MsgReceiver类的@Component注解注释了,再去执行上面的测试方法。

5. 幂等性问题

最后我们再来说说消息的幂等性问题。

大家设想下面一个场景:

消费者在消费完一条消息后,向 RabbitMQ 发送一个 ack 确认,此时由于网络断开或者其他原因导致 RabbitMQ 并没有收到这个 ack,那么此时 RabbitMQ 并不会将该条消息删除,当重新建立起连接后,消费者还是会再次收到该条消息,这就造成了消息的重复消费。同时,由于类似的原因,消息在发送的时候,同一条消息也可能会发送两次(参见四种策略确保 RabbitMQ 消息发送可靠性!你用哪种?)。种种原因导致我们在消费消息时,一定要处理好幂等性问题。

幂等性问题的处理倒也不难,基本上都是从业务上来处理,我来大概说说思路。

采用 Redis,在消费者消费消息之前,现将消息的 id 放到 Redis 中,存储方式如下:

  • id-0(正在执行业务)
  • id-1(执行业务成功)

如果 ack 失败,在 RabbitMQ 将消息交给其他的消费者时,先执行 setnx,如果 key 已经存在(说明之前有人消费过该消息),获取他的值,如果是 0,当前消费者就什么都不做,如果是 1,直接 ack。

极端情况:第一个消费者在执行业务时,出现了死锁,在 setnx 的基础上,再给 key 设置一个生存时间。生产者,发送消息时,指定 messageId。

当然这只是一个简单思路供大家参考。

松哥在 vhr 项目中也处理了消息幂等性问题,感兴趣的小伙伴可以查看 vhr 源码(https://github.com/lenve/vhr),代码在 mailserver 中。