高级发布确认
1.交换机故障 投递消息的目标交换机不存在或Broker宕机。2.路由错误 RoutingKey错误或者队列不存在。3.如果消息投递失败找不到交换机会异常会回调被捕捉到确认失败,如果路由失败消息回退。4.备份交换机和备份队列,警告队列可以处理投递到交换机但是路由错误的消息。
ConfirmConfig
/* * 发布确认 - 高级 */@Configurationpublic class ConfirmConfig { //交换机 private static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; //队列 private static final String CONFIRM_QUEUE_NAME = "confirm.queue"; //RoutingKey private static final String CONFIRM_ROUTING_KEY = "key1"; //备份交换机 private static final String BACKUP_EXCHANGE_NAME = "backup.exchange"; //备份队列 private static final String BACKUP_QUEUE_NAME = "backup.queue"; //警告队列 private static final String WARNING_QUEUE_NAME = "warning.queue"; //声明交换机 @Bean public DirectExchange confirmExchange() { /* * 构建交换机如果消息无法找到目标队列就投递给备份交换机 * 备份交换机优先级高于消息回退 */ return ExchangeBuilder .directExchange(CONFIRM_EXCHANGE_NAME) .durable(true) .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME) .build(); } //声明队列 @Bean public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } //绑定 @Bean public Binding queueBindingExchange() { return BindingBuilder.bind(confirmQueue()).to(confirmExchange()) .with(CONFIRM_ROUTING_KEY); } //声明备份交换机 @Bean public FanoutExchange backupExchange() { return new FanoutExchange(BACKUP_EXCHANGE_NAME); } //声明备份队列 @Bean public Queue backupQueue() { return QueueBuilder.durable(BACKUP_QUEUE_NAME).build(); } //声明警告队列 @Bean public Queue warningQueue() { return QueueBuilder.durable(WARNING_QUEUE_NAME).build(); } //绑定 @Bean public Binding backupQueueBindingBackExchange() { return BindingBuilder.bind(backupQueue()).to(backupExchange()); } @Bean public Binding warningQueueBindingBackExchange() { return BindingBuilder.bind(warningQueue()).to(backupExchange()); }}
MyCallback
# 默认禁用发布确认模式spring.rabbitmq.publisher-confirm-type=correlated#开启消息路由投递失败回退spring.rabbitmq.publisher-returns=true
@Componentpublic class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { /* * 交换机确认回调方法 * 1.发送成功 * 2.发送失败 * correlationData:保存回调消息的ID及其相关信息 * ack:交换机收到消息为true * cause:失败原因,成功为null */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //失败回调 if (ack) { System.out.println("消息发送成功id为" + correlationData.getId()); } else { System.err.println("消息发送失败原因:" + cause + "id为" + correlationData.getId()); } } /* * 当消息投递过程中不可达(只有失败才会回调)时将消息返回给生产者 * message:消息 * replyCode:失败码 * exchange:交换机 * routingKey:路由Key */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.err.println("message:" + new String(message.getBody())); System.err.println("replayCode:" + replyCode); System.err.println("replayText:" + replyText); System.err.println("exchange:" + exchange); System.err.println("routingKey:" + routingKey); } //注入 public MyCallBack(@Qualifier("rabbitTemplate") RabbitTemplate rabbitTemplate) { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); }}
Provider
@Slf4j@Controller@RequestMapping("/confirm")public class ProducerController { private RabbitTemplate rabbitTemplate; public ProducerController(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } //发消息 @GetMapping("/msg/{message}") public void sendMessage(@PathVariable("message") String message) { //CorrelationData correlationData = new CorrelationData("123"); //rabbitTemplate.convertAndSend("no.confirm.exchange", "", message, correlationData); rabbitTemplate.convertAndSend("confirm.exchange", "key1-x", message); log.info("发送消息内容为{}", message); }}
Consumer
@Slf4j@Componentpublic class ConfirmConsumer { @RabbitListener(queues = "confirm.queue") public void receive(Message message) { log.info("接收到来自队列confirm.queue的消息:{}", new String(message.getBody())); }}
WarningConsumer
@Slf4j@Componentpublic class WarningConsumer { @RabbitListener(queues = "warning.queue") public void receiveWarningMsg(Message message) { log.info("发现投递失败消息{}", new String(message.getBody())); }}
幂等性
同一操作发起的一次请求或多次请求的结果都是一致的,不会因为点击多次而产生副作用,消息被重复消费。消费者在消费MQ的时候,MQ已经把消息发送给消费者了,消费者在MQ返回ACK的时候网络不可达,MQ未收到ACK,再一次把该条消息发送给消费者,或者在网络重连后发送给消费者,造成消费者消费同一条消息两次。解决思路全局ID或者写一个唯一标识时间戳,或者按照自己规则生成全局唯一ID每次消费判断消息是否已经消费过了。1.唯一ID + 指纹码(一些规则,时间戳,服务器给的唯一信息码,能够保证唯一性)2.Redis原子性(利用Redis执行setnx天然具有幂等性,从而实现不重复消费)
优先级队列
具有更高优先级的队列具有较高的优先权,优先级高的消息具备优先被消费的特权。
PriorityQueueConfig
@Configurationpublic class PriorityQueueConfig { private static final String PRIORITY_EXCHANGE_NAME = "priority.exchange"; private static final String PRIORITY_QUEUE_NAME = "priority.queue"; @Bean public DirectExchange priorityExchange() { return ExchangeBuilder.directExchange(PRIORITY_EXCHANGE_NAME).build(); } @Bean public Queue priorityQueue() { Map<String, Object> arguments = new HashMap<>(); //设置队列优先级为0-10 arguments.put("x-max-priority", 10); return QueueBuilder .durable(PRIORITY_QUEUE_NAME) .withArguments(arguments) .build(); } @Bean public Binding priorityQueueBindingPriorityExchange() { return BindingBuilder.bind(priorityQueue()).to(priorityExchange()).with(""); }}
Provider
@Slf4j@Controller@RequestMapping("/priority")public class PriorityController { private RabbitTemplate rabbitTemplate; public PriorityController(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } @GetMapping("/{message}") public void test(@PathVariable("message") String message) { //参数输入String-Integer String[] split = message.split("-"); rabbitTemplate.convertAndSend("priority.exchange", "", split[0], msg -> { msg.getMessageProperties().setPriority(Integer.parseInt(split[1])); return msg; }); log.info("发送消息内容为{}", message);}
Consumer
@Slf4j@Componentpublic class PriorityConsumer { @RabbitListener(queues = "priority.queue") public void receive(Message message) { log.info("消费消息内容为{}", new String(message.getBody())); }}
惰性队列
一般消息队列默认将放在内存中,惰性队列默认将小写存放在磁盘上,如果消费者下线了造成队列消息积压,采用惰性队列对消息进行持久化存储,创建队列有default和lazy两种模式,最大优势在于内存开销小。arguments.put("x-queue-mode","lazy")