高级发布确认
1.交换机故障
投递消息的目标交换机不存在或Broker宕机。
2.路由错误
RoutingKey错误或者队列不存在。
3.如果消息投递失败找不到交换机会异常会回调被捕捉到确认失败,如果路由失败消息回退。
4.备份交换机和备份队列,警告队列可以处理投递到交换机但是路由错误的消息。
ConfirmConfig
/*
* 发布确认 - 高级
*/
@Configuration
public class ConfirmConfig {
//交换机
private static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
//队列
private static final String CONFIRM_QUEUE_NAME = "confirm.queue";
//RoutingKey
private static final String CONFIRM_ROUTING_KEY = "key1";
//备份交换机
private static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
//备份队列
private static final String BACKUP_QUEUE_NAME = "backup.queue";
//警告队列
private static final String WARNING_QUEUE_NAME = "warning.queue";
//声明交换机
@Bean
public DirectExchange confirmExchange() {
/*
* 构建交换机如果消息无法找到目标队列就投递给备份交换机
* 备份交换机优先级高于消息回退
*/
return ExchangeBuilder
.directExchange(CONFIRM_EXCHANGE_NAME)
.durable(true)
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME)
.build();
}
//声明队列
@Bean
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
//绑定
@Bean
public Binding queueBindingExchange() {
return BindingBuilder.bind(confirmQueue()).to(confirmExchange())
.with(CONFIRM_ROUTING_KEY);
}
//声明备份交换机
@Bean
public FanoutExchange backupExchange() {
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
//声明备份队列
@Bean
public Queue backupQueue() {
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
//声明警告队列
@Bean
public Queue warningQueue() {
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
//绑定
@Bean
public Binding backupQueueBindingBackExchange() {
return BindingBuilder.bind(backupQueue()).to(backupExchange());
}
@Bean
public Binding warningQueueBindingBackExchange() {
return BindingBuilder.bind(warningQueue()).to(backupExchange());
}
}
MyCallback
# 默认禁用发布确认模式
spring.rabbitmq.publisher-confirm-type=correlated
#开启消息路由投递失败回退
spring.rabbitmq.publisher-returns=true
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
/*
* 交换机确认回调方法
* 1.发送成功
* 2.发送失败
* correlationData:保存回调消息的ID及其相关信息
* ack:交换机收到消息为true
* cause:失败原因,成功为null
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//失败回调
if (ack) {
System.out.println("消息发送成功id为" + correlationData.getId());
} else {
System.err.println("消息发送失败原因:" + cause + "id为" + correlationData.getId());
}
}
/*
* 当消息投递过程中不可达(只有失败才会回调)时将消息返回给生产者
* message:消息
* replyCode:失败码
* exchange:交换机
* routingKey:路由Key
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.err.println("message:" + new String(message.getBody()));
System.err.println("replayCode:" + replyCode);
System.err.println("replayText:" + replyText);
System.err.println("exchange:" + exchange);
System.err.println("routingKey:" + routingKey);
}
//注入
public MyCallBack(@Qualifier("rabbitTemplate") RabbitTemplate rabbitTemplate) {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
}
Provider
@Slf4j
@Controller
@RequestMapping("/confirm")
public class ProducerController {
private RabbitTemplate rabbitTemplate;
public ProducerController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
//发消息
@GetMapping("/msg/{message}")
public void sendMessage(@PathVariable("message") String message) {
//CorrelationData correlationData = new CorrelationData("123");
//rabbitTemplate.convertAndSend("no.confirm.exchange", "", message, correlationData);
rabbitTemplate.convertAndSend("confirm.exchange", "key1-x", message);
log.info("发送消息内容为{}", message);
}
}
Consumer
@Slf4j
@Component
public class ConfirmConsumer {
@RabbitListener(queues = "confirm.queue")
public void receive(Message message) {
log.info("接收到来自队列confirm.queue的消息:{}", new String(message.getBody()));
}
}
WarningConsumer
@Slf4j
@Component
public class WarningConsumer {
@RabbitListener(queues = "warning.queue")
public void receiveWarningMsg(Message message) {
log.info("发现投递失败消息{}", new String(message.getBody()));
}
}
幂等性
同一操作发起的一次请求或多次请求的结果都是一致的,不会因为点击多次而产生副作用,消息被重复消费。
消费者在消费MQ的时候,MQ已经把消息发送给消费者了,消费者在MQ返回ACK的时候网络不可达,MQ未收到
ACK,再一次把该条消息发送给消费者,或者在网络重连后发送给消费者,造成消费者消费同一条消息两次。
解决思路
全局ID或者写一个唯一标识时间戳,或者按照自己规则生成全局唯一ID每次消费判断消息是否已经消费过了。
1.唯一ID + 指纹码(一些规则,时间戳,服务器给的唯一信息码,能够保证唯一性)
2.Redis原子性(利用Redis执行setnx天然具有幂等性,从而实现不重复消费)
优先级队列
具有更高优先级的队列具有较高的优先权,优先级高的消息具备优先被消费的特权。
PriorityQueueConfig
@Configuration
public class PriorityQueueConfig {
private static final String PRIORITY_EXCHANGE_NAME = "priority.exchange";
private static final String PRIORITY_QUEUE_NAME = "priority.queue";
@Bean
public DirectExchange priorityExchange() {
return ExchangeBuilder.directExchange(PRIORITY_EXCHANGE_NAME).build();
}
@Bean
public Queue priorityQueue() {
Map<String, Object> arguments = new HashMap<>();
//设置队列优先级为0-10
arguments.put("x-max-priority", 10);
return QueueBuilder
.durable(PRIORITY_QUEUE_NAME)
.withArguments(arguments)
.build();
}
@Bean
public Binding priorityQueueBindingPriorityExchange() {
return BindingBuilder.bind(priorityQueue()).to(priorityExchange()).with("");
}
}
Provider
@Slf4j
@Controller
@RequestMapping("/priority")
public class PriorityController {
private RabbitTemplate rabbitTemplate;
public PriorityController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@GetMapping("/{message}")
public void test(@PathVariable("message") String message) {
//参数输入String-Integer
String[] split = message.split("-");
rabbitTemplate.convertAndSend("priority.exchange", "", split[0], msg -> {
msg.getMessageProperties().setPriority(Integer.parseInt(split[1]));
return msg;
});
log.info("发送消息内容为{}", message);
}
Consumer
@Slf4j
@Component
public class PriorityConsumer {
@RabbitListener(queues = "priority.queue")
public void receive(Message message) {
log.info("消费消息内容为{}", new String(message.getBody()));
}
}
惰性队列
一般消息队列默认将放在内存中,惰性队列默认将小写存放在磁盘上,如果消费者下线了造成队列消息
积压,采用惰性队列对消息进行持久化存储,创建队列有default和lazy两种模式,最大优势在于内存开销小。
arguments.put("x-queue-mode","lazy")