1、引入 rabbitmq 依赖包

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

2、修改 application.properties 配置

  1. spring.rabbitmq.host=127.0.0.1
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=root
  4. spring.rabbitmq.password=root@mq
  5. spring.rabbitmq.virtual-host=/
  6. # 发送者开启 confirm 确认机制
  7. # 监听消息是否 到达 exchange
  8. spring.rabbitmq.publisher-confirms=true
  9. # 发送者开启 return 确认机制
  10. #监听消息是否 没有到达 queue
  11. spring.rabbitmq.publisher-returns=true
  12. ####################################################
  13. # 设置消费端手动 ack
  14. spring.rabbitmq.listener.simple.acknowledge-mode=manual
  15. # 是否支持重试
  16. #spring.rabbitmq.listener.simple.retry.enabled=true

3、定义交换机和队列

  1. @Configuration
  2. public class QueueConfig {
  3. /**
  4. * 确认队列
  5. * @return
  6. */
  7. @Bean(name = "confirmTestQueue")
  8. public Queue confirmTestQueue() {
  9. return new Queue("confirm_test_queue", true, false, false);
  10. }
  11. /**
  12. * 交换机
  13. * @return
  14. */
  15. @Bean(name = "confirmTestExchange")
  16. public FanoutExchange confirmTestExchange() {
  17. return new FanoutExchange("confirmTestExchange");
  18. }
  19. /**
  20. * 测试绑定队列和广播交换机
  21. * @param confirmTestExchange
  22. * @param confirmTestQueue
  23. * @return
  24. */
  25. @Bean
  26. public Binding confirmTestFanoutExchangeAndQueue(
  27. @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
  28. @Qualifier("confirmTestQueue") Queue confirmTestQueue) {
  29. return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
  30. }
  31. }

4、回调确认配置

image.png

4.1、什么是确认

发送消息确认:用来确认生产者 producer 将消息发送到 broker ,broker 上的交换机 exchange 再投递给队列 queue 的过程中,消息是否成功投递。

  • 消息从 producer 到 rabbitmq broker 有一个 confirmCallback 确认模式。
  • 消息从 exchange 到 queue 投递失败有一个 returnCallback 退回模式。

4.2、 ConfirmCallback 确认模式

  1. @Slf4j
  2. public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
  3. /**
  4. * correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
  5. * ack:消息投递到 broker 的状态,true 表示成功。
  6. * cause:表示投递失败的原因。
  7. * 消息没有到达 exchange 会回调
  8. */
  9. @Override
  10. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  11. if (!ack) {
  12. log.error("消息发送异常!");
  13. } else {
  14. log.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
  15. }
  16. }
  17. }

4.3、ReturnCallback 退回模式

  1. //旧版本 RabbitTemplate.ReturnCallback
  2. @Slf4j
  3. public class ReturnCallbackService implements RabbitTemplate.ReturnsCallback {
  4. /**
  5. * 如果消息未能投递到目标 queue 里将触发回调 returnCallback ,
  6. * 一旦向 queue 投递消息未成功,
  7. * 这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。
  8. * @param returnedMessage
  9. */
  10. @Override
  11. public void returnedMessage(ReturnedMessage returnedMessage) {
  12. log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}"
  13. , returnedMessage.getReplyCode()
  14. , returnedMessage.getReplyText()
  15. , returnedMessage.getMessage()
  16. , returnedMessage.getRoutingKey());
  17. }
  18. }

4.4、配置 RabbitTemplate

  1. @Bean
  2. public RabbitTemplate rabbitTemplate() {
  3. RabbitTemplate template = new RabbitTemplate(connectionFactory);
  4. //消息投递到 交换机 确认回调
  5. template.setConfirmCallback(confirmCallbackService());
  6. //消息投递到 队列 确认回调
  7. template.setReturnsCallback(returnCallbackService());
  8. return template;
  9. }

4.5、配置消息过期(可选)

  • 设置消息的过期时间

    1. rabbitTemplate.convertAndSend(exchange, routingKey, msg, new MessagePostProcessor() {
    2. @Override
    3. public Message postProcessMessage(Message message) throws AmqpException {
    4. MessageProperties messageProperties = message.getMessageProperties();
    5. //设置单个消息过期
    6. messageProperties.setExpiration();
    7. return message;
    8. }
    9. });
  • 设置队列过期(超时之后不进入死信)

  1. @Bean
  2. public Queue newMerchantQueue(){
  3. Map<String,Object> args = new HashMap<>(3);
  4. //消息过期后,进入到死信交换机
  5. args.put("x-dead-letter-exchange","lock_merchant_dead_exchange");
  6. //消息过期后,进入到死信交换机的路由key
  7. args.put("x-dead-letter-routing-key","lock_merchant_routing_key");
  8. //过期时间,单位毫秒
  9. args.put("x-message-ttl",10000);
  10. return QueueBuilder.durable("new_merchant_queue").withArguments(args).build();
  11. }

5、消息监听

  1. @Slf4j
  2. @Component
  3. //监听队列
  4. @RabbitListener(queues = "confirm_test_queue")
  5. public class ReceiverMessage1 {
  6. @RabbitHandler
  7. public void processHandler(String msg, Channel channel, Message message) throws IOException {
  8. try {
  9. log.info("小富收到消息:{}", msg);
  10. //TODO 具体业务
  11. //手动确认
  12. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  13. } catch (Exception e) {
  14. if (message.getMessageProperties().getRedelivered()) {
  15. log.error("消息已重复处理失败,拒绝再次接收...");
  16. // 拒绝消息
  17. channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
  18. } else {
  19. log.error("消息即将再次返回队列处理...");
  20. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  21. }
  22. }
  23. }
  24. }

6、踩坑日志

6.1、不消息确认

开启消息确认机制,消费消息别忘了 channel.basicAck,否则消息会一直存在,导致重复消费。

6.2、消息无限投递

开启消息确认机制,发生异常后将消息重新投入队列, channel.basicNack 是从重新入队列头部。

  1. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

解决方案

  • 先将消息进行应答,重新发送消息到队列,此时,消息是入队尾
  • 优化设置了消息重试次数,达到了重试上限以后,手动确认,队列删除此消息,并将消息持久化入 MySQL 并推送报警,进行人工处理和定时任务做补偿。

6.3、重复消费

借助 MySQL、或者 redis 将消息持久化,通过再消息中的唯一性属性校验。

7、延迟队列实现

7.1、方式

  • 使用死信队列
  • 使用插件 ,需要安装插件
    1. rabbitmq-plugins enable rabbitmq_delayed_message_exchange

7.2、死信队列实现

7.2.1、介绍
  • RabbitMQ的延迟队列基于消息的存活时间TTL(Time To Live)和死信交换机DLE(Dead Letter Exchanges)实现
    • TTL:RabbitMQ可以对队列和消息各自设置存活时间,规则是两者中较小的值,即队列无消费者连接的消息过期时间,或者消息在队列中一直未被消费的过期时间
    • DLE:过期的消息通过绑定的死信交换机,路由到指定的死信队列,消费者实际上消费的是死信队列上的消息
  • 进入死信队列的情况

    • 消息被拒绝(basic.reject / basic.nack),并且requeue = false
    • 消息TTL过期。TTL:Time To Live的简称,即过期时间。RabbitMQ可以对消息和队列设置TTL。
    • 队列达到最大长度

      7.2.2、声明队列

      ```java /**

      • 创建死信交换机
      • @return */ @Bean public Exchange lockMerchantDeadExchange(){ return new TopicExchange(“lock_merchant_dead_exchange”,true,false); }

      /**

      • 创建死信队列
      • @return */ @Bean public Queue lockMerchantDeadQueue(){ return QueueBuilder.durable(“lock_merchant_dead_queue”).build(); }

      /**

      • 绑定死信交换机和死信队列
      • @return */ @Bean public Binding lockMerchantBinding(){

        return new Binding(“lock_merchant_dead_queue”

        1. ,Binding.DestinationType.QUEUE,
        2. "lock_merchant_dead_exchange"
        3. ,"lock_merchant_routing_key"
        4. ,null);

        }

/**

  1. * 创建普通交换机
  2. * @return
  3. */
  4. @Bean
  5. public Exchange newMerchantExchange(){
  6. return new TopicExchange("new_merchant_exchange",true,false);
  7. }
  8. /**
  9. * 创建普通队列
  10. * @return
  11. */
  12. @Bean
  13. public Queue newMerchantQueue(){
  14. Map<String,Object> args = new HashMap<>(3);
  15. //消息过期后,进入到死信交换机
  16. args.put("x-dead-letter-exchange","lock_merchant_dead_exchange");
  17. //消息过期后,进入到死信交换机的路由key
  18. args.put("x-dead-letter-routing-key","lock_merchant_routing_key");
  19. //过期时间,单位毫秒
  20. args.put("x-message-ttl",10000);
  21. return QueueBuilder.durable("new_merchant_queue").withArguments(args).build();
  22. }
  23. /**
  24. * 绑定交换机和队列
  25. * @return
  26. */
  27. @Bean
  28. public Binding newMerchantBinding(){
  29. return new Binding("new_merchant_queue",
  30. Binding.DestinationType.QUEUE,
  31. "new_merchant_exchange",
  32. "new_merchant_routing_key",
  33. null);
  34. }
  1. <a name="yACOr"></a>
  2. ##### 7.2.3、监听队列
  3. ```java
  4. @Slf4j
  5. @Component
  6. //监听队列
  7. @RabbitListener(queues = "lock_merchant_dead_queue")
  8. public class DelayReceiverMessageHandler {
  9. @RabbitHandler
  10. public void processHandler(String msg, Channel channel, Message message) throws IOException {
  11. try {
  12. log.info("小富收到消息:{}", msg);
  13. //TODO 具体业务
  14. log.info("队列延迟的时间 {} ",message.getMessageProperties().getDelay());
  15. log.info("接受时间 {}", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
  16. //手动确认
  17. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  18. } catch (Exception e) {
  19. if (message.getMessageProperties().getRedelivered()) {
  20. log.error("消息已重复处理失败,拒绝再次接收...");
  21. // 拒绝消息
  22. channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
  23. } else {
  24. log.error("消息即将再次返回队列处理...");
  25. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  26. }
  27. }
  28. }
  29. }

7.2.4、发送
  1. rabbitTemplate.convertAndSend(exchange, routingKey, msg, new MessagePostProcessor() {
  2. @Override
  3. public Message postProcessMessage(Message message) throws AmqpException {
  4. MessageProperties messageProperties = message.getMessageProperties();
  5. //设置单个消息过期(优先级高于队列的过期时间配置)
  6. messageProperties.setExpiration("6000");
  7. return message;
  8. }
  9. });

参考