如何合适选择重试机制
情况1: 消费者获取到消息后,调用第三方接口,但接口暂时无法访问,是否需要重试? 需要重试
情况2: 消费者获取到消息后,抛出数据转换异常,是否需要重试? 不需要重试
总结:对于情况2,如果消费者代码抛出异常是需要发布新版本才能解决的问题,那么不需要重试,重试也无济于事。应该采用日志记录+定时任务job健康检查+人工进行补偿
spring:
rabbitmq:
####连接地址
host: 127.0.0.1
####端口号
port: 5672
####账号
username: guest
####密码
password: guest
### 地址
virtual-host: /admin_host
listener:
simple:
retry:
####开启消费者(程序出现异常的情况下会)进行重试
enabled: true
####最大重试次数
max-attempts: 5
####重试间隔次数
initial-interval: 3000
server:
port: 8081
// rabbitmq 默认情况下 如果消费者程序出现异常的情况下,会自动实现补偿机制
// 补偿(重试机制) 队列服务器 发送补偿请求
// 如果消费端 程序业务逻辑出现异常消息会消费成功吗?
@RabbitListener(queues = "fanout_email_queue")
public void process(String msg) throws Exception {
System.out.println("邮件消费者获取生产者消息msg:" + msg);
JSONObject jsonObject = JSONObject.parseObject(msg);
String email = jsonObject.getString("email");
String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;
System.out.println("邮件消费者开始调用第三方邮件服务器,emailUrl:" + emailUrl);
JSONObject result = HttpClientUtils.httpGet(emailUrl);
// 如果调用第三方邮件接口无法访问,如何实现自动重试.
if (result == null) {
throw new Exception("调用第三方邮件服务器接口失败!");
}
System.out.println("邮件消费者结束调用第三方邮件服务器成功,result:" + result + "程序执行结束");
}
// @RabbitListener 底层 使用Aop进行拦截,如果程序没有抛出异常,自动提交事务
// 如果Aop使用异常通知拦截 获取异常信息的话,自动实现补偿机制 ,该消息会缓存到rabbitmq服务器端进行存放,一直重试到不抛异常为准。
// 修改重试机制策略 一般默认情况下 间隔5秒重试一次
消费者如果保证消息幂等性,不被重复消费
产生原因:网络延迟传输中,会造成进行MQ重试中,在重试过程中,可能会造成重复消费。
解决办法:
使用全局MessageID判断消费方使用同一个,解决幂等性。
@Component
public class FanoutProducer {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(String queueName) {
String msg = "my_fanout_msg:" + System.currentTimeMillis();
Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();
System.out.println(msg + ":" + msg);
amqpTemplate.convertAndSend(queueName, message);
}
}
这个是全局ID与签收模式的混搭
// MQ重试机制需要注意的问题
// MQ消费者幂等性问题如何解决:使用全局ID
//其实这里可以应用redis或者数据来实现
@RabbitListener(queues = "fanout_email_queue")
public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
System.out.println("邮件消费者获取生产者消息msg:" + msg + ",消息id:" + messageId);
// 重试机制都是间隔性
JSONObject jsonObject = JSONObject.parseObject(msg);
String email = jsonObject.getString("email");
String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;
System.out.println("邮件消费者开始调用第三方邮件服务器,emailUrl:" + emailUrl);
JSONObject result = HttpClientUtils.httpGet(emailUrl);
// 如果调用第三方邮件接口无法访问,如何实现自动重试.
if (result == null) {
throw new Exception("调用第三方邮件服务器接口失败!");
}
System.out.println("邮件消费者结束调用第三方邮件服务器成功,result:" + result + "程序执行结束");
// 手动ack
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 手动签收
channel.basicAck(deliveryTag, false);
}
// 默认是自动应答模式