如何合适选择重试机制


情况1: 消费者获取到消息后,调用第三方接口,但接口暂时无法访问,是否需要重试? 需要重试

情况2: 消费者获取到消息后,抛出数据转换异常,是否需要重试? 不需要重试
总结:对于情况2,如果消费者代码抛出异常是需要发布新版本才能解决的问题,那么不需要重试,重试也无济于事。应该采用日志记录+定时任务job健康检查+人工进行补偿

  1. spring:
  2. rabbitmq:
  3. ####连接地址
  4. host: 127.0.0.1
  5. ####端口号
  6. port: 5672
  7. ####账号
  8. username: guest
  9. ####密码
  10. password: guest
  11. ### 地址
  12. virtual-host: /admin_host
  13. listener:
  14. simple:
  15. retry:
  16. ####开启消费者(程序出现异常的情况下会)进行重试
  17. enabled: true
  18. ####最大重试次数
  19. max-attempts: 5
  20. ####重试间隔次数
  21. initial-interval: 3000
  22. server:
  23. port: 8081
  1. // rabbitmq 默认情况下 如果消费者程序出现异常的情况下,会自动实现补偿机制
  2. // 补偿(重试机制) 队列服务器 发送补偿请求
  3. // 如果消费端 程序业务逻辑出现异常消息会消费成功吗?
  4. @RabbitListener(queues = "fanout_email_queue")
  5. public void process(String msg) throws Exception {
  6. System.out.println("邮件消费者获取生产者消息msg:" + msg);
  7. JSONObject jsonObject = JSONObject.parseObject(msg);
  8. String email = jsonObject.getString("email");
  9. String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;
  10. System.out.println("邮件消费者开始调用第三方邮件服务器,emailUrl:" + emailUrl);
  11. JSONObject result = HttpClientUtils.httpGet(emailUrl);
  12. // 如果调用第三方邮件接口无法访问,如何实现自动重试.
  13. if (result == null) {
  14. throw new Exception("调用第三方邮件服务器接口失败!");
  15. }
  16. System.out.println("邮件消费者结束调用第三方邮件服务器成功,result:" + result + "程序执行结束");
  17. }
  18. // @RabbitListener 底层 使用Aop进行拦截,如果程序没有抛出异常,自动提交事务
  19. // 如果Aop使用异常通知拦截 获取异常信息的话,自动实现补偿机制 ,该消息会缓存到rabbitmq服务器端进行存放,一直重试到不抛异常为准。
  20. // 修改重试机制策略 一般默认情况下 间隔5秒重试一次

消费者如果保证消息幂等性,不被重复消费


产生原因:网络延迟传输中,会造成进行MQ重试中,在重试过程中,可能会造成重复消费。

解决办法:
使用全局MessageID判断消费方使用同一个,解决幂等性。

  1. @Component
  2. public class FanoutProducer {
  3. @Autowired
  4. private AmqpTemplate amqpTemplate;
  5. public void send(String queueName) {
  6. String msg = "my_fanout_msg:" + System.currentTimeMillis();
  7. Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)
  8. .setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();
  9. System.out.println(msg + ":" + msg);
  10. amqpTemplate.convertAndSend(queueName, message);
  11. }
  12. }

这个是全局ID与签收模式的混搭

    // MQ重试机制需要注意的问题
    // MQ消费者幂等性问题如何解决:使用全局ID
    //其实这里可以应用redis或者数据来实现
    @RabbitListener(queues = "fanout_email_queue")
    public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(), "UTF-8");
        System.out.println("邮件消费者获取生产者消息msg:" + msg + ",消息id:" + messageId);
        // 重试机制都是间隔性


        JSONObject jsonObject = JSONObject.parseObject(msg);
        String email = jsonObject.getString("email");
        String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;
        System.out.println("邮件消费者开始调用第三方邮件服务器,emailUrl:" + emailUrl);
        JSONObject result = HttpClientUtils.httpGet(emailUrl);
        // 如果调用第三方邮件接口无法访问,如何实现自动重试.
        if (result == null) {
            throw new Exception("调用第三方邮件服务器接口失败!");
        }
        System.out.println("邮件消费者结束调用第三方邮件服务器成功,result:" + result + "程序执行结束");
        // 手动ack
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        // 手动签收
        channel.basicAck(deliveryTag, false);

    }
    // 默认是自动应答模式

RabbitMQ环境搭建.docx上课代码.zip