生产端:

  1. 修改 spring.rabbitmq.publisher-confirms: true

    1. spring:
    2. rabbitmq:
    3. addresses: 172.28.0.132:5672
    4. username: admin
    5. password: aompMq123!
    6. virtual-host: local-zhengpei
    7. publisher-confirms: true # 开启发送端确认
    8. publisher-returns: true # 开启发送端消息抵达队列的确认
    9. template:
    10. mandatory: true # 只要抵达队列,以异步模式优先回调这个return confirm
    11. listener:
    12. simple:
    13. acknowledge-mode: manual # 手动确认消息
  2. 编写initRabbitTemplate()方法

image.png
ack为true,表示消息已经到达broker
mq配置类

  1. import org.slf4j.Logger;
  2. import org.slf4j.LoggerFactory;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  5. import org.springframework.amqp.rabbit.connection.CorrelationData;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  8. import org.springframework.amqp.support.converter.MessageConverter;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. @Configuration
  12. public class MyRabbitConfig {
  13. private static final Logger logger = LoggerFactory.getLogger(MyRabbitConfig.class);
  14. private RabbitTemplate rabbitTemplate;
  15. @Bean
  16. public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  17. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  18. this.rabbitTemplate = rabbitTemplate;
  19. // 确认开启ConfirmCallback回调
  20. rabbitTemplate.setMandatory(true);
  21. rabbitTemplate.setMessageConverter(messageConverter());
  22. initRabbitTemplate();
  23. return rabbitTemplate;
  24. }
  25. @Bean
  26. public MessageConverter messageConverter() {
  27. return new Jackson2JsonMessageConverter();
  28. }
  29. /**
  30. * 定制RabbitTemplate
  31. * 1、服务收到消息就会回调
  32. * 1.1、spring.rabbitmq.publisher-confirms: true
  33. * 1.2、设置确认回调
  34. * 2、消息正确抵达队列就会进行回调
  35. * 2.1、spring.rabbitmq.publisher-returns: true // 开启发送端确认
  36. * 2.2 spring.rabbitmq.template.mandatory: true
  37. * 2.3、设置确认回调ReturnCallback
  38. * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
  39. */
  40. public void initRabbitTemplate() {
  41. //设置确认回调
  42. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  43. /**
  44. * 1、只要消息抵达Broker就ack=true,
  45. * @param correlationData 当前消息的唯一关联数据(这个是消息的唯一ID)
  46. * @param ack 消息是否成功收到
  47. * @param cause 失败的原因
  48. */
  49. @Override
  50. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  51. logger.info("confirm correlationData:[{}],ack:[{}],cause:[{}]", correlationData, ack, cause);
  52. }
  53. });
  54. // 设置下消息抵达队列的确认回调
  55. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
  56. /**
  57. * 只要消息没有投递给指定的队列,就触发这个失败回调
  58. * @param message 投递失败的消息详细信息
  59. * @param replyCode 回复的状态码
  60. * @param replyText 回复的文本内容
  61. * @param exchange 当时这个消息发给哪个交换机
  62. * @param routingKey 当时这个消息用哪个路邮键
  63. */
  64. @Override
  65. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  66. logger.error("Fail Message[" + message + "]==>replyCode[" + replyCode + "]" +
  67. "==>replyText[" + replyText + "]==>exchange[" + exchange + "]==>routingKey[" + routingKey + "]");
  68. }
  69. });
  70. }
  71. }

消费端:

image.png
image.png

  1. /**
  2. * @author zhengpi
  3. * @since 2021/6/6
  4. */
  5. @Component
  6. public class DirectReceiver {
  7. private static final Logger logger = LoggerFactory.getLogger(DirectReceiver.class);
  8. // @RabbitHandler 可加可不加
  9. @RabbitListener(queues = "testDirectQueue")//监听的队列名称 testDirectQueue
  10. public void process(Message message, Channel channel) {
  11. String msg = new String(message.getBody());
  12. // channel 内按照顺序内递增的
  13. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  14. logger.info("deliveryTag:[{}]", deliveryTag);
  15. // 消费消息
  16. try {
  17. //签收消息
  18. channel.basicAck(deliveryTag, false);
  19. //不签收消息
  20. // channel.basicNack(deliveryTag,false,true);
  21. } catch (Exception e) {
  22. // 网络中断
  23. }
  24. }
  25. }