高级发布确认

  1. 1.交换机故障
  2. 投递消息的目标交换机不存在或Broker宕机。
  3. 2.路由错误
  4. RoutingKey错误或者队列不存在。
  5. 3.如果消息投递失败找不到交换机会异常会回调被捕捉到确认失败,如果路由失败消息回退。
  6. 4.备份交换机和备份队列,警告队列可以处理投递到交换机但是路由错误的消息。

ConfirmConfig

  1. /*
  2. * 发布确认 - 高级
  3. */
  4. @Configuration
  5. public class ConfirmConfig {
  6. //交换机
  7. private static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
  8. //队列
  9. private static final String CONFIRM_QUEUE_NAME = "confirm.queue";
  10. //RoutingKey
  11. private static final String CONFIRM_ROUTING_KEY = "key1";
  12. //备份交换机
  13. private static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
  14. //备份队列
  15. private static final String BACKUP_QUEUE_NAME = "backup.queue";
  16. //警告队列
  17. private static final String WARNING_QUEUE_NAME = "warning.queue";
  18. //声明交换机
  19. @Bean
  20. public DirectExchange confirmExchange() {
  21. /*
  22. * 构建交换机如果消息无法找到目标队列就投递给备份交换机
  23. * 备份交换机优先级高于消息回退
  24. */
  25. return ExchangeBuilder
  26. .directExchange(CONFIRM_EXCHANGE_NAME)
  27. .durable(true)
  28. .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME)
  29. .build();
  30. }
  31. //声明队列
  32. @Bean
  33. public Queue confirmQueue() {
  34. return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
  35. }
  36. //绑定
  37. @Bean
  38. public Binding queueBindingExchange() {
  39. return BindingBuilder.bind(confirmQueue()).to(confirmExchange())
  40. .with(CONFIRM_ROUTING_KEY);
  41. }
  42. //声明备份交换机
  43. @Bean
  44. public FanoutExchange backupExchange() {
  45. return new FanoutExchange(BACKUP_EXCHANGE_NAME);
  46. }
  47. //声明备份队列
  48. @Bean
  49. public Queue backupQueue() {
  50. return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
  51. }
  52. //声明警告队列
  53. @Bean
  54. public Queue warningQueue() {
  55. return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
  56. }
  57. //绑定
  58. @Bean
  59. public Binding backupQueueBindingBackExchange() {
  60. return BindingBuilder.bind(backupQueue()).to(backupExchange());
  61. }
  62. @Bean
  63. public Binding warningQueueBindingBackExchange() {
  64. return BindingBuilder.bind(warningQueue()).to(backupExchange());
  65. }
  66. }

MyCallback

  1. # 默认禁用发布确认模式
  2. spring.rabbitmq.publisher-confirm-type=correlated
  3. #开启消息路由投递失败回退
  4. spring.rabbitmq.publisher-returns=true
  1. @Component
  2. public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
  3. /*
  4. * 交换机确认回调方法
  5. * 1.发送成功
  6. * 2.发送失败
  7. * correlationData:保存回调消息的ID及其相关信息
  8. * ack:交换机收到消息为true
  9. * cause:失败原因,成功为null
  10. */
  11. @Override
  12. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  13. //失败回调
  14. if (ack) {
  15. System.out.println("消息发送成功id为" + correlationData.getId());
  16. } else {
  17. System.err.println("消息发送失败原因:" + cause + "id为" + correlationData.getId());
  18. }
  19. }
  20. /*
  21. * 当消息投递过程中不可达(只有失败才会回调)时将消息返回给生产者
  22. * message:消息
  23. * replyCode:失败码
  24. * exchange:交换机
  25. * routingKey:路由Key
  26. */
  27. @Override
  28. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  29. System.err.println("message:" + new String(message.getBody()));
  30. System.err.println("replayCode:" + replyCode);
  31. System.err.println("replayText:" + replyText);
  32. System.err.println("exchange:" + exchange);
  33. System.err.println("routingKey:" + routingKey);
  34. }
  35. //注入
  36. public MyCallBack(@Qualifier("rabbitTemplate") RabbitTemplate rabbitTemplate) {
  37. rabbitTemplate.setConfirmCallback(this);
  38. rabbitTemplate.setReturnCallback(this);
  39. }
  40. }

Provider

  1. @Slf4j
  2. @Controller
  3. @RequestMapping("/confirm")
  4. public class ProducerController {
  5. private RabbitTemplate rabbitTemplate;
  6. public ProducerController(RabbitTemplate rabbitTemplate) {
  7. this.rabbitTemplate = rabbitTemplate;
  8. }
  9. //发消息
  10. @GetMapping("/msg/{message}")
  11. public void sendMessage(@PathVariable("message") String message) {
  12. //CorrelationData correlationData = new CorrelationData("123");
  13. //rabbitTemplate.convertAndSend("no.confirm.exchange", "", message, correlationData);
  14. rabbitTemplate.convertAndSend("confirm.exchange", "key1-x", message);
  15. log.info("发送消息内容为{}", message);
  16. }
  17. }

Consumer

  1. @Slf4j
  2. @Component
  3. public class ConfirmConsumer {
  4. @RabbitListener(queues = "confirm.queue")
  5. public void receive(Message message) {
  6. log.info("接收到来自队列confirm.queue的消息:{}", new String(message.getBody()));
  7. }
  8. }

WarningConsumer

  1. @Slf4j
  2. @Component
  3. public class WarningConsumer {
  4. @RabbitListener(queues = "warning.queue")
  5. public void receiveWarningMsg(Message message) {
  6. log.info("发现投递失败消息{}", new String(message.getBody()));
  7. }
  8. }

幂等性

  1. 同一操作发起的一次请求或多次请求的结果都是一致的,不会因为点击多次而产生副作用,消息被重复消费。
  2. 消费者在消费MQ的时候,MQ已经把消息发送给消费者了,消费者在MQ返回ACK的时候网络不可达,MQ未收到
  3. ACK,再一次把该条消息发送给消费者,或者在网络重连后发送给消费者,造成消费者消费同一条消息两次。
  4. 解决思路
  5. 全局ID或者写一个唯一标识时间戳,或者按照自己规则生成全局唯一ID每次消费判断消息是否已经消费过了。
  6. 1.唯一ID + 指纹码(一些规则,时间戳,服务器给的唯一信息码,能够保证唯一性)
  7. 2.Redis原子性(利用Redis执行setnx天然具有幂等性,从而实现不重复消费)

优先级队列

  1. 具有更高优先级的队列具有较高的优先权,优先级高的消息具备优先被消费的特权。

PriorityQueueConfig

  1. @Configuration
  2. public class PriorityQueueConfig {
  3. private static final String PRIORITY_EXCHANGE_NAME = "priority.exchange";
  4. private static final String PRIORITY_QUEUE_NAME = "priority.queue";
  5. @Bean
  6. public DirectExchange priorityExchange() {
  7. return ExchangeBuilder.directExchange(PRIORITY_EXCHANGE_NAME).build();
  8. }
  9. @Bean
  10. public Queue priorityQueue() {
  11. Map<String, Object> arguments = new HashMap<>();
  12. //设置队列优先级为0-10
  13. arguments.put("x-max-priority", 10);
  14. return QueueBuilder
  15. .durable(PRIORITY_QUEUE_NAME)
  16. .withArguments(arguments)
  17. .build();
  18. }
  19. @Bean
  20. public Binding priorityQueueBindingPriorityExchange() {
  21. return BindingBuilder.bind(priorityQueue()).to(priorityExchange()).with("");
  22. }
  23. }

Provider

  1. @Slf4j
  2. @Controller
  3. @RequestMapping("/priority")
  4. public class PriorityController {
  5. private RabbitTemplate rabbitTemplate;
  6. public PriorityController(RabbitTemplate rabbitTemplate) {
  7. this.rabbitTemplate = rabbitTemplate;
  8. }
  9. @GetMapping("/{message}")
  10. public void test(@PathVariable("message") String message) {
  11. //参数输入String-Integer
  12. String[] split = message.split("-");
  13. rabbitTemplate.convertAndSend("priority.exchange", "", split[0], msg -> {
  14. msg.getMessageProperties().setPriority(Integer.parseInt(split[1]));
  15. return msg;
  16. });
  17. log.info("发送消息内容为{}", message);
  18. }

Consumer

  1. @Slf4j
  2. @Component
  3. public class PriorityConsumer {
  4. @RabbitListener(queues = "priority.queue")
  5. public void receive(Message message) {
  6. log.info("消费消息内容为{}", new String(message.getBody()));
  7. }
  8. }

惰性队列

  1. 一般消息队列默认将放在内存中,惰性队列默认将小写存放在磁盘上,如果消费者下线了造成队列消息
  2. 积压,采用惰性队列对消息进行持久化存储,创建队列有defaultlazy两种模式,最大优势在于内存开销小。
  3. arguments.put("x-queue-mode","lazy")