导图

语雀内容
初始化队列的时候设置持久化。
provider

  • 配置confirm模式,并编写confirm函数
  • 发送消息的时候 ,设置 deliveryMode为持久化

consumer

  • 设置ack模式
  • 接收消息时,手动ack

代码示例

参考:https://blog.csdn.net/Weixiaohuai/article/details/94961012

不同版本代码可能不太一样,差别不大,例如在传递id的时候需要封装的参数有修改。

下面通过一个示例说明在RabbitMQ中实现发送确认和消费者确认的使用方法。

【a】引入pom.xml依赖:

  1. 1. <dependency>
  2. 2. <groupId>org.springframework.boot</groupId>
  3. 3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. 4. </dependency>

【b】配置文件application.yml:

  1. server:
  2. port: 6666
  3. spring:
  4. application:
  5. name: mq-message-confirm2
  6. rabbitmq:
  7. host: 127.0.0.1
  8. virtual-host: /vhost
  9. username: wsh
  10. password: wsh
  11. port: 5672
  12. #消息发送确认回调
  13. publisher-confirms: true
  14. #指定消息确认模式为手动确认
  15. listener:
  16. simple:
  17. acknowledge-mode: manual
  18. #发送返回监听回调
  19. publisher-returns: true

这里需要注意两点:

  • spring.rabbitmq.publisher-confirms = true
  • spring.rabbitmq.listener.simple.acknowledge-mode = manual

【c】自定义消息发送、返回回调监听类

  1. import org.slf4j.Logger;
  2. import org.slf4j.LoggerFactory;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.amqp.rabbit.support.CorrelationData;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Component;
  8. import javax.annotation.PostConstruct;
  9. import java.nio.charset.StandardCharsets;
  10. /**
  11. * @Description 自定义消息发送确认的回调
  12. * @Author weishihuai
  13. * @Date 2019/6/27 10:42
  14. * <p>
  15. * 实现接口:implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback
  16. * ConfirmCallback:只确认消息是否正确到达交换机中,不管是否到达交换机,该回调都会执行;
  17. * ReturnCallback:如果消息从交换机未正确到达队列中将会执行,正确到达则不执行;
  18. */
  19. @Component
  20. public class CustomConfirmAndReturnCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
  21. private static final Logger logger = LoggerFactory.getLogger(CustomConfirmAndReturnCallback.class);
  22. @Autowired
  23. private RabbitTemplate rabbitTemplate;
  24. /**
  25. * PostConstruct: 用于在依赖关系注入完成之后需要执行的方法上,以执行任何初始化.
  26. */
  27. @PostConstruct
  28. public void init() {
  29. //指定 ConfirmCallback
  30. rabbitTemplate.setConfirmCallback(this);
  31. //指定 ReturnCallback
  32. rabbitTemplate.setReturnCallback(this);
  33. }
  34. /**
  35. * 消息从交换机成功到达队列,则returnedMessage方法不会执行;
  36. * 消息从交换机未能成功到达队列,则returnedMessage方法会执行;
  37. */
  38. @Override
  39. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  40. logger.info("returnedMessage回调方法>>>" + new String(message.getBody(), StandardCharsets.UTF_8) + ",replyCode:" + replyCode
  41. + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);
  42. }
  43. /**
  44. * 如果消息没有到达交换机,则该方法中isSendSuccess = false,error为错误信息;
  45. * 如果消息正确到达交换机,则该方法中isSendSuccess = true;
  46. */
  47. @Override
  48. public void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) {
  49. logger.info("confirm回调方法>>>回调消息ID为: " + correlationData.getId());
  50. if (isSendSuccess) {
  51. logger.info("confirm回调方法>>>消息发送到交换机成功!");
  52. } else {
  53. logger.info("confirm回调方法>>>消息发送到交换机失败!,原因 : [{}]", error);
  54. }
  55. }
  56. }

注意这里我同时也实现了RabbitTemplate.ReturnCallback返回回调接口,并且重写了returnedMessage()方法,返回回调主要指的是:如果消息从交换机未正确到达队列中将会执行,正确到达则不执行returnedMessage()。

【d】RabbitMQ配置信息

  1. /**
  2. * @Description: RabbitMQ配置信息,绑定交换器、队列、路由键设置
  3. * @author: weishihuai
  4. * @Date: 2019/6/27 10:38
  5. * <p>
  6. * 说明:
  7. * <p>
  8. * 1. 声明Exchange交换器;
  9. * 2. 声明Queue队列;
  10. * 3. 绑定:BindingBuilder绑定队列到交换器,并设置路由键;
  11. */
  12. @Component
  13. public class RabbitMQConfig {
  14. private static final String EXCHANGE_NAME = "message_confirm_exchange";
  15. private static final String QUEUE_NAME = "message_confirm_queue";
  16. private static final String ROUTING_KEY = "user.#";
  17. @Bean
  18. private TopicExchange topicExchange() {
  19. return new TopicExchange(EXCHANGE_NAME);
  20. }
  21. @Bean
  22. private Queue queue() {
  23. return new Queue(QUEUE_NAME);
  24. }
  25. @Bean
  26. private Binding bindingDirect() {
  27. return BindingBuilder.bind(queue()).to(topicExchange()).with(ROUTING_KEY);
  28. }
  29. }

【e】发送者:这里发送了三条消息,如果三条消息中某条消息已经被拒绝过一次,那么触发basicNack()重新回到队列中,如果该消息再次被拒绝,那么消费者将会调用basicReject()直接拒绝该条消息,以后也不会再次接收该消息。

  1. /**
  2. * @Description: 生产者
  3. * @author: weishihuai
  4. * @Date: 2019/6/27 10:39
  5. */
  6. @Component
  7. public class Producer {
  8. private static final Logger logger = LoggerFactory.getLogger(Producer.class);
  9. private static final String EXCHANGE_NAME = "message_confirm_exchange";
  10. private static final String ROUTING_KEY = "user.add.submit";
  11. @Autowired
  12. public RabbitTemplate rabbitTemplate;
  13. public void sendMessage() {
  14. for (int i = 1; i <= 3; i++) {
  15. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  16. logger.info("【Producer】发送的消费ID = {}", correlationData.getId());
  17. String msg = "hello confirm message" + i;
  18. logger.info("【Producer】发送的消息 = {}", msg);
  19. rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, msg, correlationData);
  20. }
  21. }
  22. }

【f】消费者1:这里模拟了在处理消息的时候触发一个空指针异常,用于触发拒绝某个消息。

  1. import com.rabbitmq.client.Channel;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. import java.io.IOException;
  8. /**
  9. * @Description: 消费者1
  10. * @author: weishihuai
  11. * @Date: 2019/6/27 10:42
  12. */
  13. @Component
  14. public class Consumer01 {
  15. private static final Logger logger = LoggerFactory.getLogger(Consumer01.class);
  16. @RabbitListener(queues = "message_confirm_queue")
  17. public void receiveMessage01(String msg, Channel channel, Message message) throws IOException {
  18. try {
  19. // 这里模拟一个空指针异常,
  20. String string = null;
  21. string.length();
  22. logger.info("【Consumer01成功接收到消息】>>> {}", msg);
  23. // 确认收到消息,只确认当前消费者的一个消息收到
  24. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  25. } catch (Exception e) {
  26. if (message.getMessageProperties().getRedelivered()) {
  27. logger.info("【Consumer01】消息已经回滚过,拒绝接收消息 : {}", msg);
  28. // 拒绝消息,并且不再重新进入队列
  29. //public void basicReject(long deliveryTag, boolean requeue)
  30. channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
  31. } else {
  32. logger.info("【Consumer01】消息即将返回队列重新处理 :{}", msg);
  33. //设置消息重新回到队列处理
  34. // requeue表示是否重新回到队列,true重新入队
  35. //public void basicNack(long deliveryTag, boolean multiple, boolean requeue)
  36. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  37. }
  38. e.printStackTrace();
  39. }
  40. }
  41. }

【g】消费者2:该消费者为正常消费消息。

  1. import com.rabbitmq.client.Channel;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. import java.io.IOException;
  8. /**
  9. * @Description: 消费者2
  10. * @author: weishihuai
  11. * @Date: 2019/6/27 10:42
  12. */
  13. @Component
  14. public class Consumer02 {
  15. private static final Logger logger = LoggerFactory.getLogger(Consumer02.class);
  16. @RabbitListener(queues = "message_confirm_queue")
  17. public void receiveMessage02(String msg, Channel channel, Message message) throws IOException {
  18. try {
  19. logger.info("【Consumer02成功接收到消息】>>> {}", msg);
  20. // 确认收到消息,只确认当前消费者的一个消息收到
  21. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  22. } catch (Exception e) {
  23. if (message.getMessageProperties().getRedelivered()) {
  24. logger.info("【Consumer02】消息已经回滚过,拒绝接收消息 : {}", msg);
  25. // 拒绝消息,并且不再重新进入队列
  26. //public void basicReject(long deliveryTag, boolean requeue)
  27. channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
  28. } else {
  29. logger.info("【Consumer02】消息即将返回队列重新处理 :{}", msg);
  30. //设置消息重新回到队列处理
  31. // requeue表示是否重新回到队列,true重新入队
  32. //public void basicNack(long deliveryTag, boolean multiple, boolean requeue)
  33. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  34. }
  35. e.printStackTrace();
  36. }
  37. }
  38. }

【h】启动项目,查看运行结果

rabbitMq消息确认机制 - 图1
rabbitMq消息确认机制 - 图2
rabbitMq消息确认机制 - 图3

由控制台结果可见,hello confirm message1这条消息第一次被consumer1拒绝了一次,执行basicNack重新回到队列,第二次又被判断为该条消息已经回滚过,调用basicReject方法又被拒绝并且禁止重新回到队列,这样该条消息将不会被消费者重新消费。
hello confirm message2这条消息成功被consumer2消费掉,hello confirm message3这条消息第一次也被 consumer1拒绝了,但是在重新回到队列之后,被consumer2成功消费了。
同时可以看到,三条消息都正确从发送者到达交换机,所以都执行了 confirm(CorrelationData correlationData, boolean isSendSuccess, String error)回调方法。
因为消息都成功从交换机正确到达队列中,所有监听的returnCallback的returnedMessage()方法并没有被执行。下面我们测试一下,假设指定一个binding key不匹配的。修改下面的路由键,让消息无法从交换机正确路由到队列上:

rabbitMq消息确认机制 - 图4

首先在RabbitMQ管理控制台将之前的user.#绑定键解除绑定:
rabbitMq消息确认机制 - 图5

重新启动项目,查看控制台日志:

rabbitMq消息确认机制 - 图6

可见,三条消息都未能正确从交换机路由到队列,所以都执行了returnedMessage回调方法。

下面我们测试一下消息从发送者未能正确到达交换机的情况,这里主要修改一个不存在的交换机名称,这样消息就不能正确到达消费者监听队列所在的交换机message_confirm_exchange,从而触发confirmCallback中发送失败的情况,error为错误原因。

rabbitMq消息确认机制 - 图7

控制台日志:

  1. 1. 2019-07-07 11:22:27.762 INFO 12528 --- [ 127.0.0.1:5672] c.w.s.s.c.CustomConfirmAndReturnCallback : confirm回调方法>>>回调消息ID为: 01ee6352-861e-4d96-8346-d808508ae3d2
  2. 2. 2019-07-07 11:22:27.762 INFO 12528 --- [ 127.0.0.1:5672] c.w.s.s.c.CustomConfirmAndReturnCallback : confirm回调方法>>>消息发送到交换机失败!,原因 : [channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'message_confirm_exchange2' in vhost '/vhost', class-id=60, method-id=40)]
  3. 3. 2019-07-07 11:22:27.762 INFO 12528 --- [ 127.0.0.1:5672] c.w.s.s.c.CustomConfirmAndReturnCallback : confirm回调方法>>>回调消息ID为: 0a6e426a-c93a-4f53-9f1d-0375b8f61a41
  4. 4. 2019-07-07 11:22:27.762 INFO 12528 --- [ 127.0.0.1:5672] c.w.s.s.c.CustomConfirmAndReturnCallback : confirm回调方法>>>消息发送到交换机失败!,原因 : [channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'message_confirm_exchange2' in vhost '/vhost', class-id=60, method-id=40)]
  5. 5. 2019-07-07 11:22:27.762 INFO 12528 --- [ 127.0.0.1:5672] c.w.s.s.c.CustomConfirmAndReturnCallback : confirm回调方法>>>回调消息ID为: d11ad471-3a4c-4bfe-804b-eda966c4df51
  6. 6. 2019-07-07 11:22:27.762 INFO 12528 --- [ 127.0.0.1:5672] c.w.s.s.c.CustomConfirmAndReturnCallback : confirm回调方法>>>消息发送到交换机失败!,原因 : [channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'message_confirm_exchange2' in vhost '/vhost', class-id=60, method-id=40)]