六、死信队列

6.1 死信的概念

死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

6.2 死信的来源

  1. 消息TTL过期
  2. 队列达到最大长度(队列满了,无法再添加数据到mq中)
  3. 消息被拒绝(basic.reject或basic.nack)并且requeue=false

    6.3 代码实践

    6.3.1 消息TTL过期

    ```java package com.rem.rabbitmq.ee.HdeadLetterQueue;

import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rem.rabbitmq.ee.RabbitMqUtils;

import java.util.Scanner;

/**

  • 死信队列
  • 某些时候由于特定的原因导致queue中的某些消息无法被消费 这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
  • 死信来源
    • 消息TTL过去
    • 队列达到最大长度
    • 消息被拒绝 *
  • @author Rem
  • @date 2021-12-27 */

public class TTLProducer {

  1. public static void main(String[] args) throws Exception {
  2. Channel channel = RabbitMqUtils.getChannel();
  3. channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_NORMAL_TTL, BuiltinExchangeType.DIRECT);
  4. System.err.println("开始发送消息...");
  5. Scanner scanner = new Scanner(System.in);
  6. while (scanner.hasNext()) {
  7. String message = scanner.next();
  8. /**
  9. * 发送消息
  10. * 设置消息过期时间 ms
  11. */
  12. AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
  13. channel.basicPublish(RabbitMqUtils.EXCHANGE_NORMAL_TTL, RabbitMqUtils.ROUTING_KEY_NORMAL_TTL, properties, message.getBytes());
  14. System.err.println("发送消息完毕" + message);
  15. }
  16. }

}

  1. ```java
  2. package com.rem.rabbitmq.ee.HdeadLetterQueue;
  3. import com.rabbitmq.client.BuiltinExchangeType;
  4. import com.rabbitmq.client.CancelCallback;
  5. import com.rabbitmq.client.Channel;
  6. import com.rabbitmq.client.DeliverCallback;
  7. import com.rem.rabbitmq.ee.RabbitMqUtils;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. /**
  11. * 普通消费者-验证ttl过期
  12. *
  13. * @author Rem
  14. * @date 2021-12-27
  15. */
  16. public class ConsumerTTL15 {
  17. public static void main(String[] args) throws Exception {
  18. Channel channel = RabbitMqUtils.getChannel();
  19. //声明死信交换机
  20. channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_DEAD, BuiltinExchangeType.DIRECT);
  21. //声明死信队列
  22. channel.queueDeclare(RabbitMqUtils.QUEUE_DEAD, false, false, false, null);
  23. //死信队列绑定死信交换机与routingkey
  24. channel.queueBind(RabbitMqUtils.QUEUE_DEAD, RabbitMqUtils.EXCHANGE_DEAD, RabbitMqUtils.ROUTING_KEY_DEAD);
  25. /********************************************************************************/
  26. //声明普通交换机
  27. channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_NORMAL_TTL, BuiltinExchangeType.DIRECT);
  28. //声明普通队列 并且绑定死信交换机
  29. //设置参数
  30. Map<String, Object> arguments = new HashMap<>();
  31. //设置过期时间 单位ms 10s 一般都在生产方设置
  32. //arguments.put("x-message-ttl", 10000);
  33. arguments.put("x-dead-letter-exchange", RabbitMqUtils.EXCHANGE_DEAD);
  34. arguments.put("x-dead-letter-routing-key", RabbitMqUtils.ROUTING_KEY_DEAD);
  35. channel.queueDeclare(RabbitMqUtils.QUEUE_NORMAL_TTL, false, false, false, arguments);
  36. //普通交换机与普通队列通过普通routingkey 进行绑定
  37. channel.queueBind(RabbitMqUtils.QUEUE_NORMAL_TTL, RabbitMqUtils.EXCHANGE_NORMAL_TTL, RabbitMqUtils.ROUTING_KEY_NORMAL_TTL);
  38. //消费回调
  39. DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("接收到的消息:" + new String(message.getBody()));
  40. //取消消费回调 如果在消费的时候队列被删除了
  41. CancelCallback cancelCallback = consumerTag -> System.out.println("消费消息被中断");
  42. System.out.println("消费者-NORMAL-TTL 等待消费");
  43. channel.basicConsume(RabbitMqUtils.QUEUE_NORMAL_TTL, true, deliverCallback, cancelCallback);
  44. }
  45. }
  1. package com.rem.rabbitmq.ee.HdeadLetterQueue;
  2. import com.rabbitmq.client.CancelCallback;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.DeliverCallback;
  5. import com.rem.rabbitmq.ee.RabbitMqUtils;
  6. /**
  7. * 死信消费者
  8. *
  9. * @author Rem
  10. * @date 2021-12-27
  11. */
  12. public class ConsumerDead16 {
  13. public static void main(String[] args) throws Exception {
  14. Channel channel = RabbitMqUtils.getChannel();
  15. //声明死信交换机
  16. // channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_DEAD, BuiltinExchangeType.DIRECT);
  17. //声明死信队列
  18. // channel.queueDeclare(RabbitMqUtils.QUEUE_DEAD, false, false, false, null);
  19. //死信队列绑定死信交换机与routingkey
  20. // channel.queueBind(RabbitMqUtils.QUEUE_DEAD, RabbitMqUtils.EXCHANGE_DEAD, RabbitMqUtils.ROUTING_KEY_DEAD);
  21. //消费回调
  22. DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("接收到的死信消息:" + new String(message.getBody()));
  23. //取消消费回调 如果在消费的时候队列被删除了
  24. CancelCallback cancelCallback = consumerTag -> System.out.println("消费消息被中断");
  25. System.out.println("消费者-DEAD 等待消费");
  26. channel.basicConsume(RabbitMqUtils.QUEUE_DEAD, true, deliverCallback, cancelCallback);
  27. }
  28. }

6.3.2 消息到达最大长度

  1. package com.rem.rabbitmq.ee.HdeadLetterQueue;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rem.rabbitmq.ee.RabbitMqUtils;
  5. import java.util.Scanner;
  6. /**
  7. * 死信队列
  8. * 某些时候由于特定的原因导致queue中的某些消息无法被消费 这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
  9. * <p>
  10. * 死信来源
  11. * * 消息TTL过去
  12. * * 队列达到最大长度
  13. * * 消息被拒绝
  14. *
  15. * @author Rem
  16. * @date 2021-12-29
  17. */
  18. public class MaxLengthProducer {
  19. public static void main(String[] args) throws Exception {
  20. Channel channel = RabbitMqUtils.getChannel();
  21. channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_NORMAL_MAXLENGTH, BuiltinExchangeType.DIRECT);
  22. System.err.println("开始发送消息...");
  23. Scanner scanner = new Scanner(System.in);
  24. while (scanner.hasNext()) {
  25. String message = scanner.next();
  26. channel.basicPublish(RabbitMqUtils.EXCHANGE_NORMAL_MAXLENGTH, RabbitMqUtils.ROUTING_KEY_NORMAL_MAXLENGTH, null, message.getBytes());
  27. System.err.println("发送消息完毕" + message);
  28. }
  29. }
  30. }
  1. package com.rem.rabbitmq.ee.HdeadLetterQueue;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.CancelCallback;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.DeliverCallback;
  6. import com.rem.rabbitmq.ee.RabbitMqUtils;
  7. import java.util.HashMap;
  8. import java.util.Map;
  9. /**
  10. * 普通消费者-验证队列最大长度
  11. *
  12. * @author Rem
  13. * @date 2021-12-27
  14. */
  15. public class ConsumerMaxLength17 {
  16. public static void main(String[] args) throws Exception {
  17. Channel channel = RabbitMqUtils.getChannel();
  18. //声明死信交换机
  19. channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_DEAD, BuiltinExchangeType.DIRECT);
  20. //声明死信队列
  21. channel.queueDeclare(RabbitMqUtils.QUEUE_DEAD, false, false, false, null);
  22. //死信队列绑定死信交换机与routingkey
  23. channel.queueBind(RabbitMqUtils.QUEUE_DEAD, RabbitMqUtils.EXCHANGE_DEAD, RabbitMqUtils.ROUTING_KEY_DEAD);
  24. /********************************************************************************/
  25. //声明普通交换机
  26. channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_NORMAL_MAXLENGTH, BuiltinExchangeType.DIRECT);
  27. //声明普通队列 并且绑定死信交换机
  28. //设置参数
  29. Map<String, Object> arguments = new HashMap<>();
  30. arguments.put("x-dead-letter-exchange", RabbitMqUtils.EXCHANGE_DEAD);
  31. arguments.put("x-dead-letter-routing-key", RabbitMqUtils.ROUTING_KEY_DEAD);
  32. /**
  33. * 设置队列最大长度
  34. */
  35. arguments.put("x-max-length", 6);
  36. channel.queueDeclare(RabbitMqUtils.QUEUE_NORMAL_MAXLENGTH, false, false, false, arguments);
  37. //普通交换机与普通队列通过普通routingkey 进行绑定
  38. channel.queueBind(RabbitMqUtils.QUEUE_NORMAL_MAXLENGTH, RabbitMqUtils.EXCHANGE_NORMAL_MAXLENGTH, RabbitMqUtils.ROUTING_KEY_NORMAL_MAXLENGTH);
  39. //消费回调
  40. DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("接收到的消息:" + new String(message.getBody()));
  41. //取消消费回调 如果在消费的时候队列被删除了
  42. CancelCallback cancelCallback = consumerTag -> System.out.println("消费消息被中断");
  43. System.out.println("消费者-NORMAL-MaxLength 等待消费");
  44. channel.basicConsume(RabbitMqUtils.QUEUE_NORMAL_MAXLENGTH, true, deliverCallback, cancelCallback);
  45. }
  46. }

6.3.3 消息被拒绝

  1. package com.rem.rabbitmq.ee.HdeadLetterQueue;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rem.rabbitmq.ee.RabbitMqUtils;
  5. import java.util.Scanner;
  6. /**
  7. * 死信队列
  8. * 某些时候由于特定的原因导致queue中的某些消息无法被消费 这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
  9. * <p>
  10. * 死信来源
  11. * * 消息TTL过去
  12. * * 队列达到最大长度
  13. * * 消息被拒绝
  14. *
  15. * @author Rem
  16. * @date 2021-12-29
  17. */
  18. public class RejectProducer {
  19. public static void main(String[] args) throws Exception {
  20. Channel channel = RabbitMqUtils.getChannel();
  21. channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_NORMAL_REJECT, BuiltinExchangeType.DIRECT);
  22. System.err.println("开始发送消息...");
  23. Scanner scanner = new Scanner(System.in);
  24. while (scanner.hasNext()) {
  25. String message = scanner.next();
  26. channel.basicPublish(RabbitMqUtils.EXCHANGE_NORMAL_REJECT, RabbitMqUtils.ROUTING_KEY_NORMAL_REJECT, null, message.getBytes());
  27. System.err.println("发送消息完毕" + message);
  28. }
  29. }
  30. }
  1. package com.rem.rabbitmq.ee.HdeadLetterQueue;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.CancelCallback;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.DeliverCallback;
  6. import com.rem.rabbitmq.ee.RabbitMqUtils;
  7. import java.util.HashMap;
  8. import java.util.Map;
  9. /**
  10. * 普通消费者-验证拒绝消息
  11. *
  12. * @author Rem
  13. * @date 2021-12-27
  14. */
  15. public class ConsumerReject18 {
  16. public static void main(String[] args) throws Exception {
  17. Channel channel = RabbitMqUtils.getChannel();
  18. //声明死信交换机
  19. channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_DEAD, BuiltinExchangeType.DIRECT);
  20. //声明死信队列
  21. channel.queueDeclare(RabbitMqUtils.QUEUE_DEAD, false, false, false, null);
  22. //死信队列绑定死信交换机与routingkey
  23. channel.queueBind(RabbitMqUtils.QUEUE_DEAD, RabbitMqUtils.EXCHANGE_DEAD, RabbitMqUtils.ROUTING_KEY_DEAD);
  24. /********************************************************************************/
  25. //声明普通交换机
  26. channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_NORMAL_REJECT, BuiltinExchangeType.DIRECT);
  27. //声明普通队列 并且绑定死信交换机
  28. //设置参数
  29. Map<String, Object> arguments = new HashMap<>();
  30. arguments.put("x-dead-letter-exchange", RabbitMqUtils.EXCHANGE_DEAD);
  31. arguments.put("x-dead-letter-routing-key", RabbitMqUtils.ROUTING_KEY_DEAD);
  32. /**
  33. * 设置队列最大长度
  34. */
  35. arguments.put("x-max-length", 6);
  36. channel.queueDeclare(RabbitMqUtils.QUEUE_NORMAL_REJECT, false, false, false, arguments);
  37. //普通交换机与普通队列通过普通routingkey 进行绑定
  38. channel.queueBind(RabbitMqUtils.QUEUE_NORMAL_REJECT, RabbitMqUtils.EXCHANGE_NORMAL_REJECT, RabbitMqUtils.ROUTING_KEY_NORMAL_REJECT);
  39. //消费回调
  40. DeliverCallback deliverCallback = (consumerTag, message) -> {
  41. System.out.println("接收到的消息,并且拒绝:" + new String(message.getBody()));
  42. /**
  43. * 拒绝消息
  44. * DeliveryTag – 来自收到的AMQP.Basic.GetOk或AMQP.Basic.Deliver的标签
  45. * requeue – 如果被拒绝的消息应该重新排队而不是丢弃/死信,则为真
  46. */
  47. channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
  48. };
  49. //取消消费回调 如果在消费的时候队列被删除了
  50. CancelCallback cancelCallback = consumerTag -> System.out.println("消费消息被中断");
  51. System.out.println("消费者-NORMAL-REJECT 等待消费");
  52. /**
  53. * 设置自动应答为false
  54. */
  55. boolean autoAck = false;
  56. channel.basicConsume(RabbitMqUtils.QUEUE_NORMAL_REJECT, autoAck, deliverCallback, cancelCallback);
  57. }
  58. }

七、延迟队列

使用死信队列实现延迟队列

  1. package com.rem.rabbitmq.ee.Idelay;
  2. import com.rabbitmq.client.AMQP;
  3. import com.rabbitmq.client.BuiltinExchangeType;
  4. import com.rabbitmq.client.Channel;
  5. import com.rem.rabbitmq.ee.RabbitMqUtils;
  6. import java.util.Scanner;
  7. /**
  8. * 延迟队列 -死信队列中ttl的实现
  9. * 设置ttl过期两种方法
  10. * ** 在队列上设置过期时间 arguments.put("x-message-ttl", 10000) 一旦消息过期就被丢弃,如果配置了死信队列就丢到死信队列
  11. * ** 在消息上设置过期时间 new AMQP.BasicProperties().builder().expiration("10000").build() 如果不设置代表永不过期;如果设置为0代表直接发送给消费者,否则丢弃消息
  12. *
  13. * @author Rem
  14. * @date 2021-12-27
  15. */
  16. public class DelayProducer {
  17. public static void main(String[] args) throws Exception {
  18. Channel channel = RabbitMqUtils.getChannel();
  19. channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_NORMAL_TTL, BuiltinExchangeType.DIRECT);
  20. System.err.println("开始发送消息...");
  21. Scanner scanner = new Scanner(System.in);
  22. while (scanner.hasNext()) {
  23. String message = scanner.next();
  24. /**
  25. * 发送消息
  26. * 设置消息过期时间 ms
  27. */
  28. AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("8000").build();
  29. channel.basicPublish(RabbitMqUtils.EXCHANGE_NORMAL_TTL, RabbitMqUtils.ROUTING_KEY_NORMAL_TTL, properties, message.getBytes());
  30. System.err.println("发送消息完毕" + message);
  31. }
  32. }
  33. }
  1. package com.rem.rabbitmq.ee.Idelay;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.CancelCallback;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.DeliverCallback;
  6. import com.rem.rabbitmq.ee.RabbitMqUtils;
  7. import java.util.HashMap;
  8. import java.util.Map;
  9. /**
  10. * 普通消费者-延迟队列
  11. *
  12. * @author Rem
  13. * @date 2021-12-27
  14. */
  15. public class Consumer19 {
  16. public static void main(String[] args) throws Exception {
  17. Channel channel = RabbitMqUtils.getChannel();
  18. //声明死信交换机
  19. channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_DEAD, BuiltinExchangeType.DIRECT);
  20. //声明死信队列
  21. channel.queueDeclare(RabbitMqUtils.QUEUE_DEAD, false, false, false, null);
  22. //死信队列绑定死信交换机与routingkey
  23. channel.queueBind(RabbitMqUtils.QUEUE_DEAD, RabbitMqUtils.EXCHANGE_DEAD, RabbitMqUtils.ROUTING_KEY_DEAD);
  24. /********************************************************************************/
  25. //声明普通交换机
  26. channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_NORMAL_TTL, BuiltinExchangeType.DIRECT);
  27. //声明普通队列 并且绑定死信交换机
  28. //设置参数
  29. Map<String, Object> arguments = new HashMap<>();
  30. //设置过期时间 单位ms 10s 一般都在生产方设置
  31. //arguments.put("x-message-ttl", 10000);
  32. arguments.put("x-dead-letter-exchange", RabbitMqUtils.EXCHANGE_DEAD);
  33. arguments.put("x-dead-letter-routing-key", RabbitMqUtils.ROUTING_KEY_DEAD);
  34. channel.queueDeclare(RabbitMqUtils.QUEUE_NORMAL_TTL, false, false, false, arguments);
  35. //消费回调
  36. DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("接收到的死信消息:" + new String(message.getBody()));
  37. //取消消费回调 如果在消费的时候队列被删除了
  38. CancelCallback cancelCallback = consumerTag -> System.out.println("消费消息被中断");
  39. /**
  40. * 直接延迟队列接收消息
  41. */
  42. System.out.println("消费者-DEAD 等待消费");
  43. channel.basicConsume(RabbitMqUtils.QUEUE_DEAD, true, deliverCallback, cancelCallback);
  44. }
  45. }

安装插件实现延迟队列

rabbitmq 插件安装
延迟队列插件下载

  1. #拷贝到rabbitmq容器 773067241f96 中
  2. docker cp /home/rabbitmq/rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins
  3. #进入容器
  4. docker exec -it rabbitmq /bin/bash
  5. #启用插件
  6. rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  7. #查看
  8. rabbitmq-plugins list
  9. #重新启动容器
  10. docker restart rabbitmq

集成springboot代码实现

  1. server:
  2. servlet:
  3. context-path: /rabbit
  4. spring:
  5. rabbitmq:
  6. host: remzhi.top
  7. port: 5672
  8. username: rem
  9. password: 123456
  1. package com.rem.rabbitmq.boot.Adelay;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.DirectExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.amqp.core.QueueBuilder;
  7. import org.springframework.beans.factory.annotation.Qualifier;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.context.annotation.Configuration;
  10. /**
  11. * 使用死信队列实现 延迟队列
  12. *
  13. * @author Rem
  14. * @date 2021-12-29
  15. */
  16. @Configuration
  17. public class QueueConfig {
  18. /**
  19. * 普通交换机 队列 routingKey
  20. */
  21. public static final String X_EXCHANGE = "X";
  22. public static final String XA_ROUTING_KEY = "XA";
  23. public static final String XB_ROUTING_KEY = "XB";
  24. public static final String QA_QUEUE = "QA";
  25. public static final String QB_QUEUE = "QB";
  26. public static final String XC_ROUTING_KEY = "XC";
  27. public static final String QC_QUEUE = "QC";
  28. /**
  29. * 死信交换机 队列 routingKey
  30. */
  31. public static final String Y_DEAD_EXCHANGE = "Y";
  32. public static final String YD_DEAD_ROUTING_KEY = "YD";
  33. public static final String QD_DEAD_QUEUE = "QD";
  34. /*********************************普通队列QA*********************************************************/
  35. /**
  36. * 声明普通交换机X
  37. *
  38. * @return
  39. */
  40. @Bean("xExchange")
  41. public DirectExchange xExchange() {
  42. return new DirectExchange(X_EXCHANGE);
  43. }
  44. /**
  45. * 声明普通队列QA并且绑定死信交换机
  46. *
  47. * @return
  48. */
  49. @Bean("queueA")
  50. public Queue queueA() {
  51. /**
  52. * 手动设置参数
  53. */
  54. /* Map<String, Object> arguments = new HashMap<>();
  55. arguments.put("x-dead-letter-exchange", Y_DEAD_EXCHANGE);
  56. arguments.put("x-dead-letter-routing-key", YD_DEAD_ROUTING_KEY);
  57. arguments.put("x-message-ttl", 10000);
  58. return QueueBuilder.durable(QA_QUEUE).withArguments(arguments).build();*/
  59. /**
  60. * 根据内置api设置参数 过期时间 ms
  61. */
  62. return QueueBuilder.durable(QA_QUEUE)
  63. .deadLetterExchange(Y_DEAD_EXCHANGE).deadLetterRoutingKey(YD_DEAD_ROUTING_KEY).ttl(10000).build();
  64. }
  65. /**
  66. * 普通交换机与普通队列QA通过 普通routingKey XA 绑定
  67. *
  68. * @param queueA
  69. * @param xExchange
  70. * @return
  71. */
  72. @Bean
  73. public Binding queueaBindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) {
  74. return BindingBuilder.bind(queueA).to(xExchange).with(XA_ROUTING_KEY);
  75. }
  76. /*********************************普通队列QB*********************************************************/
  77. /**
  78. * 声明普通队列QB 并且绑定死信交换机
  79. *
  80. * @return
  81. */
  82. @Bean("queueB")
  83. public Queue queueB() {
  84. /**
  85. * 根据内置api设置参数
  86. */
  87. return QueueBuilder.durable(QB_QUEUE)
  88. .deadLetterExchange(Y_DEAD_EXCHANGE).deadLetterRoutingKey(YD_DEAD_ROUTING_KEY).ttl(40000).build();
  89. }
  90. /**
  91. * 普通交换机与普通队列通过QB 普通routingKey 绑定
  92. *
  93. * @param queueB
  94. * @param xExchange
  95. * @return
  96. */
  97. @Bean
  98. public Binding queuebBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) {
  99. return BindingBuilder.bind(queueB).to(xExchange).with(XB_ROUTING_KEY);
  100. }
  101. /*********************************死信队列QD*********************************************************/
  102. /**
  103. * 声明死信交换机Y
  104. *
  105. * @return
  106. */
  107. @Bean("yExchange")
  108. public DirectExchange yExchange() {
  109. return new DirectExchange(Y_DEAD_EXCHANGE);
  110. }
  111. /**
  112. * 声明死信队列
  113. *
  114. * @return
  115. */
  116. @Bean("queueD")
  117. public Queue queueD() {
  118. return new Queue(QD_DEAD_QUEUE);
  119. }
  120. /**
  121. * 死信交换机与死信队列通过 死信routingKey XA 绑定
  122. *
  123. * @param queueD
  124. * @param yExchange
  125. * @return
  126. */
  127. @Bean
  128. public Binding deadLetterBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) {
  129. return BindingBuilder.bind(queueD).to(yExchange).with(YD_DEAD_ROUTING_KEY);
  130. }
  131. /*********************************普通队列QC*********************************************************/
  132. /**
  133. * 声明普通队列QC 并且绑定死信交换机
  134. * 不设置过期时间 交给生产方设置
  135. * *因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,
  136. * *而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
  137. *
  138. * @return
  139. */
  140. @Bean("queueC")
  141. public Queue queueC() {
  142. /**
  143. * 根据内置api设置参数
  144. */
  145. return QueueBuilder.durable(QC_QUEUE)
  146. .deadLetterExchange(Y_DEAD_EXCHANGE).deadLetterRoutingKey(YD_DEAD_ROUTING_KEY).build();
  147. }
  148. /**
  149. * 普通交换机与普通队列通过QC 普通routingKey 绑定
  150. *
  151. * @param queueC
  152. * @param xExchange
  153. * @return
  154. */
  155. @Bean
  156. public Binding queuecBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange) {
  157. return BindingBuilder.bind(queueC).to(xExchange).with(XC_ROUTING_KEY);
  158. }
  159. }
  1. package com.rem.rabbitmq.boot.Adelay;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. /**
  7. * 使用插件实现rabbitMQ 延迟队列
  8. *
  9. * @author Rem
  10. * @date 2022-01-03
  11. */
  12. @Configuration
  13. public class DelayQueueConfig {
  14. /**
  15. * 延迟交换机 队列 routingKey
  16. */
  17. public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";
  18. public static final String DELAY_ROUTING_KEY = "DELAY_ROUTING_KEY";
  19. public static final String DELAY_QUEUE = "DELAY_QUEUE";
  20. @Bean
  21. public DirectExchange delayExchange() {
  22. //自定义参数
  23. // Map<String, Object> arguments = new HashMap<>();
  24. // //自定义交换机的类型
  25. // arguments.put("x-delayed-type", "direct");
  26. // return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, arguments);
  27. //使用api设置
  28. return ExchangeBuilder.directExchange(DELAY_EXCHANGE).delayed().build();
  29. }
  30. @Bean
  31. public Queue delayQueue() {
  32. return QueueBuilder.durable(DELAY_QUEUE).build();
  33. }
  34. @Bean
  35. public Binding bindingDelayedQueue(@Qualifier("delayQueue") Queue delayQueue, @Qualifier("delayExchange") Exchange delayExchange) {
  36. return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
  37. }
  38. }
  1. package com.rem.rabbitmq.boot.Adelay;
  2. import io.swagger.annotations.Api;
  3. import io.swagger.annotations.ApiOperation;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.amqp.core.MessagePostProcessor;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.web.bind.annotation.GetMapping;
  9. import org.springframework.web.bind.annotation.PathVariable;
  10. import org.springframework.web.bind.annotation.RequestMapping;
  11. import org.springframework.web.bind.annotation.RestController;
  12. import java.time.LocalDateTime;
  13. /**
  14. * @author Rem
  15. * @date 2021-12-30
  16. */
  17. @Slf4j
  18. @RestController
  19. @RequestMapping("/delay")
  20. @Api(tags = "延迟队列")
  21. public class SendMessageController {
  22. @Autowired
  23. private RabbitTemplate rabbitTemplate;
  24. @ApiOperation(value = "队列设置过期时间 的延迟消息")
  25. @GetMapping("/sendMsg/{message}")
  26. public void sendMsg(@PathVariable String message) {
  27. log.info("当前时间:{},发送一条信息给两个TTL队列:{}", LocalDateTime.now(), message);
  28. rabbitTemplate.convertAndSend(QueueConfig.X_EXCHANGE, QueueConfig.XA_ROUTING_KEY, "消息来自ttl为10S的队列:" + message);
  29. rabbitTemplate.convertAndSend(QueueConfig.X_EXCHANGE, QueueConfig.XB_ROUTING_KEY, "消息来自ttl为40S的队列:" + message);
  30. }
  31. @ApiOperation(value = "消息设置过期时间 的延迟消息")
  32. @GetMapping("/sendMsg/{message}/{ttlTime}")
  33. public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
  34. log.info("当前时间:{},发送一条信息:{},,并且设置ttl:{}", LocalDateTime.now(), message, ttlTime);
  35. /**
  36. * setExpiration 设置过期时间
  37. */
  38. MessagePostProcessor messagePostProcessor = messageProperties -> {
  39. messageProperties.getMessageProperties().setExpiration(ttlTime);
  40. return messageProperties;
  41. };
  42. rabbitTemplate.convertAndSend(QueueConfig.X_EXCHANGE, QueueConfig.XC_ROUTING_KEY, message, messagePostProcessor);
  43. }
  44. @ApiOperation(value = "设置插件-发送生产端设置过期时间 的延迟消息")
  45. @GetMapping("/sendDelayMsg/{message}/{ttlTime}")
  46. public void sendDelayMsg(@PathVariable String message, @PathVariable Integer ttlTime) {
  47. log.info("当前时间:{},发送一条信息:{},,使用延迟队列插件 并且设置ttl:{}", LocalDateTime.now(), message, ttlTime);
  48. /**
  49. * setDelay 设置延迟时间
  50. */
  51. MessagePostProcessor messagePostProcessor = messageProperties -> {
  52. messageProperties.getMessageProperties().setDelay(ttlTime);
  53. return messageProperties;
  54. };
  55. rabbitTemplate.convertAndSend(DelayQueueConfig.DELAY_EXCHANGE, DelayQueueConfig.DELAY_ROUTING_KEY, message, messagePostProcessor);
  56. }
  57. }
  1. /**
  2. * 监听死信队列里的消息
  3. *
  4. * @param message
  5. */
  6. @RabbitListener(queues = QueueConfig.QD_DEAD_QUEUE)
  7. public void receiveD(Message message) {
  8. String msg = new String(message.getBody());
  9. log.info("当前时间:{},收到死信队列信息{}", LocalDateTime.now(), msg);
  10. }
  11. /**
  12. * 监听使用插件的延迟队列消息
  13. *
  14. * @param message
  15. */
  16. @RabbitListener(queues = DelayQueueConfig.DELAY_QUEUE)
  17. public void receiveDelay(Message message) {
  18. String msg = new String(message.getBody());
  19. log.info("当前时间:{},收到延迟队列信息{}", LocalDateTime.now(), msg);
  20. }

八、发布确认高级

  1. server:
  2. servlet:
  3. context-path: /rabbit
  4. spring:
  5. rabbitmq:
  6. host: remzhi.top
  7. port: 5672
  8. username: rem
  9. password: 123456
  10. #设置发布确认模式
  11. # correlated :发布消息成功到交换器后会触发回调方法
  12. # simple:其一效果和CORRELATED值一样会触发回调方法
  13. #其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法 等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑
  14. #要注意的点是 waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker
  15. publisher-confirm-type: correlated
  16. #设置消息回退 默认false
  17. #true时 当无法被路由的消息会回退给生产者
  18. publisher-returns: true
  1. package com.rem.rabbitmq.boot.Bconfirm;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. /**
  7. * @author Rem
  8. * @date 2021-12-29
  9. */
  10. @Configuration
  11. public class ConfirmConfig {
  12. /**
  13. * 发布确认交换机 队列 routingKey
  14. */
  15. public static final String CONFIRM_EXCHANGE = "CONFIRM_EXCHANGE";
  16. public static final String CONFIRM_ROUTING_KEY = "CONFIRM_ROUTING_KEY";
  17. public static final String CONFIRM_QUEUE = "CONFIRM_QUEUE";
  18. /**
  19. * 声明业务交换机
  20. *
  21. * @return
  22. */
  23. @Bean("confirmExchange")
  24. public DirectExchange confirmExchange() {
  25. return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).build();
  26. }
  27. /**
  28. * 声明发布确认队列
  29. *
  30. * @return
  31. */
  32. @Bean("confirmQueue")
  33. public Queue confirmQueue() {
  34. return QueueBuilder.durable(CONFIRM_QUEUE).build();
  35. }
  36. /**
  37. * 声明绑定关系
  38. *
  39. * @param confirmQueue
  40. * @param confirmExchange
  41. * @return
  42. */
  43. @Bean
  44. public Binding confirmQueueaBinding(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) {
  45. return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
  46. }
  47. }
  1. package com.rem.rabbitmq.boot.Bconfirm;
  2. import io.swagger.annotations.Api;
  3. import io.swagger.annotations.ApiOperation;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.amqp.rabbit.connection.CorrelationData;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.web.bind.annotation.GetMapping;
  9. import org.springframework.web.bind.annotation.PathVariable;
  10. import org.springframework.web.bind.annotation.RequestMapping;
  11. import org.springframework.web.bind.annotation.RestController;
  12. import java.time.LocalDateTime;
  13. /**
  14. * @author Rem
  15. * @date 2021-12-30
  16. */
  17. @Slf4j
  18. @RestController
  19. @RequestMapping("/comfirm")
  20. @Api(tags = "发布确认")
  21. public class ConfirmController {
  22. @Autowired
  23. private RabbitTemplate rabbitTemplate;
  24. @ApiOperation(value = "发送一条确认的消息")
  25. @GetMapping("/sendMsg/{message}")
  26. public void sendMsg(@PathVariable String message) {
  27. log.info("当前时间:{},发送消息开始:{}", LocalDateTime.now(), message);
  28. CorrelationData correlationData = new CorrelationData("111");
  29. rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE, ConfirmConfig.CONFIRM_ROUTING_KEY, ConfirmConfig.CONFIRM_ROUTING_KEY + ":" + message, correlationData);
  30. /**
  31. * 发送到一个假的exchange上
  32. * 消息ack为false
  33. */
  34. CorrelationData correlationData2 = new CorrelationData("222");
  35. rabbitTemplate.convertAndSend("FAKE_EXCHANGE", ConfirmConfig.CONFIRM_ROUTING_KEY, ConfirmConfig.CONFIRM_ROUTING_KEY + ":" + message, correlationData2);
  36. /**
  37. * 发送到一个假的routingKey上
  38. * 消息会被退回
  39. */
  40. CorrelationData correlationData3 = new CorrelationData("333");
  41. rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE, "FAKE_ROUTING_KEY", "FAKE_ROUTING_KEY" + ":" + message, correlationData3);
  42. log.info("当前时间:{},发送消息完毕:{}", LocalDateTime.now(), message);
  43. }
  44. }
  1. /**
  2. * 发布确认队列监听
  3. *
  4. * @param message
  5. */
  6. @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE)
  7. public void receiveConfirem(Message message) {
  8. String msg = new String(message.getBody());
  9. log.info("发布确认消费者收到的消息是:{}", msg);
  10. }
  11. /**
  12. * 报警队列监听
  13. *
  14. * @param message
  15. */
  16. @RabbitListener(queues = BackupExchangeConfig.WARING_QUEUE)
  17. public void receiveBackupWaring(Message message) {
  18. String msg = new String(message.getBody());
  19. log.info("报警队列收到的消息是:{}", msg);
  20. }

备份交换机

  1. package com.rem.rabbitmq.boot.CbackupExchange;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. /**
  7. * @author Rem
  8. * @date 2022/1/8
  9. */
  10. @Configuration
  11. public class BackupExchangeConfig {
  12. /**
  13. * 普通交换机(发布确认) 队列 routingKey
  14. */
  15. public static final String NORMAL_EXCHANGE = "NORMAL_EXCHANGE";
  16. public static final String NORMAL_ROUTING_KEY = "NORMAL_ROUTING_KEY";
  17. public static final String NORMAL_QUEUE = "NORMAL_QUEUE";
  18. /**
  19. * 备份交换机 备份队列 报警队列
  20. */
  21. public static final String BACKUP_EXCHANGE = "BACKUP_EXCHANGE";
  22. public static final String BACKUP_QUEUE = "BACKUP_QUEUE";
  23. public static final String WARING_QUEUE = "WARING_QUEUE";
  24. /**
  25. * 声明备份交换机
  26. *
  27. * @return
  28. */
  29. @Bean("backupExchange")
  30. public FanoutExchange backupExchange() {
  31. return ExchangeBuilder.fanoutExchange(BACKUP_EXCHANGE).build();
  32. }
  33. /**
  34. * 声明备份队列
  35. *
  36. * @return
  37. */
  38. @Bean("backupQueue")
  39. public Queue backupQueue() {
  40. return QueueBuilder.durable(BACKUP_QUEUE).build();
  41. }
  42. /**
  43. * 声明报警队列
  44. *
  45. * @return
  46. */
  47. @Bean("waringQueue")
  48. public Queue waringQueue() {
  49. return QueueBuilder.durable(WARING_QUEUE).build();
  50. }
  51. /**
  52. * 声明备份队列绑定关系
  53. *
  54. * @param backupQueue
  55. * @param backupExchange
  56. * @return
  57. */
  58. @Bean
  59. public Binding backupQueueBinding(@Qualifier("backupQueue") Queue backupQueue, @Qualifier("backupExchange") FanoutExchange backupExchange) {
  60. return BindingBuilder.bind(backupQueue).to(backupExchange);
  61. }
  62. /**
  63. * 声明报警队列绑定关系
  64. *
  65. * @param waringQueue
  66. * @param backupExchange
  67. * @return
  68. */
  69. @Bean
  70. public Binding waringQueueaBinding(@Qualifier("waringQueue") Queue waringQueue, @Qualifier("backupExchange") FanoutExchange backupExchange) {
  71. return BindingBuilder.bind(waringQueue).to(backupExchange);
  72. }
  73. /**
  74. * 声明业务交换机
  75. * **绑定备份交换机
  76. *
  77. * @return
  78. */
  79. @Bean("normalExchange")
  80. public DirectExchange normalExchange() {
  81. return ExchangeBuilder.directExchange(NORMAL_EXCHANGE).alternate(BACKUP_EXCHANGE).build();
  82. }
  83. /**
  84. * 声明发布确认队列
  85. *
  86. * @return
  87. */
  88. @Bean("normalQueue")
  89. public Queue normalQueue() {
  90. return QueueBuilder.durable(NORMAL_QUEUE).build();
  91. }
  92. /**
  93. * 声明普通绑定关系
  94. *
  95. * @param normalQueue
  96. * @param normalExchange
  97. * @return
  98. */
  99. @Bean
  100. public Binding normalQueueaBinding(@Qualifier("normalQueue") Queue normalQueue, @Qualifier("normalExchange") DirectExchange normalExchange) {
  101. return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY);
  102. }
  103. }
  1. package com.rem.rabbitmq.boot;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.ReturnedMessage;
  4. import org.springframework.amqp.rabbit.connection.CorrelationData;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Component;
  8. import javax.annotation.PostConstruct;
  9. /**
  10. * @author Rem
  11. * @date 2022/1/5
  12. */
  13. @Component
  14. @Slf4j
  15. public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
  16. @Autowired
  17. private RabbitTemplate rabbitTemplate;
  18. //依赖注入rabbitTemplate之后再设置它的回调对象
  19. @PostConstruct
  20. public void init() {
  21. rabbitTemplate.setConfirmCallback(this);
  22. rabbitTemplate.setReturnsCallback(this);
  23. }
  24. /**
  25. * 回调接口
  26. * 当发送的消息传给不存在的交换机 ack为false
  27. * 当发送的消息传给存在的交换机,但是routingkey错误时 ack依然是true 表示交换机收到了消息但是是路由出错
  28. *
  29. * @param correlationData 发布确认基类
  30. * @param ack
  31. * @param cause
  32. */
  33. @Override
  34. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  35. if (correlationData != null) {
  36. String id = correlationData.getId();
  37. if (ack) {
  38. log.info("交换机已经收到id为:{}的消息", id);
  39. } else {
  40. log.warn("交换机还未收到id为:{}消息,由于原因:{}", id, cause);
  41. }
  42. }
  43. }
  44. /**
  45. * 只有消息不可到达时 才会回退消息
  46. * 回退消息
  47. *
  48. * @param returned
  49. */
  50. @Override
  51. public void returnedMessage(ReturnedMessage returned) {
  52. String message = new String(returned.getMessage().getBody());
  53. log.error("消息{},被交换机{}退回,退回原因:{},路由key:{}",
  54. message, returned.getExchange(), returned.getReplyText(), returned.getRoutingKey());
  55. }
  56. }
  1. package com.rem.rabbitmq.boot.CbackupExchange;
  2. import io.swagger.annotations.Api;
  3. import io.swagger.annotations.ApiOperation;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.amqp.rabbit.connection.CorrelationData;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.web.bind.annotation.GetMapping;
  9. import org.springframework.web.bind.annotation.PathVariable;
  10. import org.springframework.web.bind.annotation.RequestMapping;
  11. import org.springframework.web.bind.annotation.RestController;
  12. import java.time.LocalDateTime;
  13. /**
  14. * @author Rem
  15. * @date 2022/1/8
  16. */
  17. @Slf4j
  18. @RestController
  19. @RequestMapping("/backup")
  20. @Api(tags = "备份交换机")
  21. public class BackupExchangeController {
  22. @Autowired
  23. private RabbitTemplate rabbitTemplate;
  24. @ApiOperation(value = "备份交换机发送一条确认的消息")
  25. @GetMapping("/sendMsg/{message}")
  26. public void sendMsg(@PathVariable String message) {
  27. log.info("当前时间:{},发送消息开始:{}", LocalDateTime.now(), message);
  28. CorrelationData correlationData = new CorrelationData("111");
  29. rabbitTemplate.convertAndSend(BackupExchangeConfig.NORMAL_EXCHANGE, BackupExchangeConfig.NORMAL_ROUTING_KEY, BackupExchangeConfig.NORMAL_ROUTING_KEY + ":" + message, correlationData);
  30. /**
  31. * 发送到一个假的exchange上
  32. * 消息ack为false
  33. */
  34. CorrelationData correlationData2 = new CorrelationData("222");
  35. rabbitTemplate.convertAndSend("FAKE_EXCHANGE", BackupExchangeConfig.NORMAL_EXCHANGE, BackupExchangeConfig.NORMAL_ROUTING_KEY + ":" + message, correlationData2);
  36. /**
  37. * 发送到一个假的routingKey上
  38. * 设置了备份交换机 消息不会被退回 (备份交换机的级别比设置退回消息权限高)
  39. */
  40. CorrelationData correlationData3 = new CorrelationData("333");
  41. rabbitTemplate.convertAndSend(BackupExchangeConfig.NORMAL_EXCHANGE, "FAKE_ROUTING_KEY", "FAKE_ROUTING_KEY" + ":" + message, correlationData3);
  42. log.info("当前时间:{},发送消息完毕:{}", LocalDateTime.now(), message);
  43. }
  44. }

九、rabbitMQ中其他知识点

9.1 幂等性

用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。

解决思路

  • redis天然具有幂等性 可以使用redis执行setnx,从而实现不重复消费
  • 唯一id加指纹码机制,在分布式系统中生成唯一的mq id 来实现

    9.2 优先级队列

    ```java package com.rem.rabbitmq.boot.Dpriority;

import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;

/**

  • 优先级队列 *
  • @author Rem
  • @date 2021-12-29 */

@Configuration public class PriotiryConfig {

  1. public static final String PRIOTIRY_EXCHANGE = "PRIOTIRY_EXCHANGE";
  2. public static final String PRIOTIRY_ROUTING_KEY = "PRIOTIRY_ROUTING_KEY";
  3. public static final String PRIOTIRY_QUEUE = "PRIOTIRY_QUEUE";
  4. @Bean("priotiryExchange")
  5. public DirectExchange priotiryExchange() {
  6. return ExchangeBuilder.directExchange(PRIOTIRY_EXCHANGE).build();
  7. }
  8. /**
  9. * 设置优先级 0~255 官网推荐1-10 设置高比较吃内存和cpu
  10. * 发送的消息也必须设置优先级
  11. *
  12. * @return
  13. */
  14. @Bean("priotiryQueue")
  15. public Queue priotiryQueue() {
  16. return QueueBuilder.durable(PRIOTIRY_QUEUE).maxPriority(5).build();
  17. }
  18. @Bean
  19. public Binding priotiryQueueaBinding(@Qualifier("priotiryQueue") Queue priotiryQueue, @Qualifier("priotiryExchange") DirectExchange priotiryExchange) {
  20. return BindingBuilder.bind(priotiryQueue).to(priotiryExchange).with(PRIOTIRY_ROUTING_KEY);
  21. }

}

  1. ```java
  2. package com.rem.rabbitmq.boot.Dpriority;
  3. import io.swagger.annotations.Api;
  4. import io.swagger.annotations.ApiOperation;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.amqp.core.MessagePostProcessor;
  7. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.web.bind.annotation.GetMapping;
  10. import org.springframework.web.bind.annotation.PathVariable;
  11. import org.springframework.web.bind.annotation.RequestMapping;
  12. import org.springframework.web.bind.annotation.RestController;
  13. import java.time.LocalDateTime;
  14. /**
  15. * @author Rem
  16. * @date 2021-12-30
  17. */
  18. @Slf4j
  19. @RestController
  20. @RequestMapping("/priotiry")
  21. @Api(tags = "优先级队列")
  22. public class PriotiryController {
  23. @Autowired
  24. private RabbitTemplate rabbitTemplate;
  25. @ApiOperation(value = "发送一条的消息并且遍历")
  26. @GetMapping("/sendMsg/{message}")
  27. public void sendMsg(@PathVariable String message) {
  28. for (int i = 0; i < 10; i++) {
  29. String msg = i + ":" + message;
  30. log.info("当前时间:{},发送消息开始:{}", LocalDateTime.now(), msg);
  31. if (i % 3 == 0) {
  32. //给消息赋予一个priority属性
  33. Integer finalI = i;
  34. MessagePostProcessor messagePostProcessor = messageProperties -> {
  35. messageProperties.getMessageProperties().setPriority(finalI);
  36. return messageProperties;
  37. };
  38. rabbitTemplate.convertAndSend(PriotiryConfig.PRIOTIRY_EXCHANGE, PriotiryConfig.PRIOTIRY_ROUTING_KEY, msg, messagePostProcessor);
  39. } else {
  40. rabbitTemplate.convertAndSend(PriotiryConfig.PRIOTIRY_EXCHANGE, PriotiryConfig.PRIOTIRY_ROUTING_KEY, msg);
  41. }
  42. log.info("当前时间:{},发送消息完毕:{}", LocalDateTime.now(), msg);
  43. }
  44. }
  45. }
  1. /**
  2. * 优先级队列监听
  3. *
  4. * @param message
  5. */
  6. @RabbitListener(queues = PriotiryConfig.PRIOTIRY_QUEUE)
  7. public void receivePriotiry(Message message) {
  8. String msg = new String(message.getBody());
  9. log.info("优先级队列收到的消息是:{}", msg);
  10. }

9.3 惰性队列

  1. package com.rem.rabbitmq.boot.Elazy;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. /**
  7. * 惰性队列
  8. * 在发送1百万条消息,每条消息大概占1KB的情况下,普通队列占用内存是1.2GB,而惰性队列仅仅占用1.5MB
  9. *
  10. * @author Rem
  11. * @date 2021-12-29
  12. */
  13. @Configuration
  14. public class LazyConfig {
  15. public static final String LAZY_EXCHANGE = "LAZY_EXCHANGE";
  16. public static final String LAZY_ROUTING_KEY = "LAZY_ROUTING_KEY";
  17. public static final String LAZY_QUEUE = "LAZY_QUEUE";
  18. @Bean("lazyExchange")
  19. public DirectExchange lazyExchange() {
  20. return ExchangeBuilder.directExchange(LAZY_EXCHANGE).build();
  21. }
  22. /**
  23. * 声明队列为惰性队列 消息直接存放在磁盘中
  24. *
  25. * @return
  26. */
  27. @Bean("lazyQueue")
  28. public Queue lazyQueue() {
  29. return QueueBuilder.durable(LAZY_QUEUE).lazy().build();
  30. }
  31. @Bean
  32. public Binding lazyQueueaBinding(@Qualifier("lazyQueue") Queue lazyQueue, @Qualifier("lazyExchange") DirectExchange lazyExchange) {
  33. return BindingBuilder.bind(lazyQueue).to(lazyExchange).with(LAZY_ROUTING_KEY);
  34. }
  35. }
  1. package com.rem.rabbitmq.boot.Elazy;
  2. import io.swagger.annotations.Api;
  3. import io.swagger.annotations.ApiOperation;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.web.bind.annotation.GetMapping;
  8. import org.springframework.web.bind.annotation.PathVariable;
  9. import org.springframework.web.bind.annotation.RequestMapping;
  10. import org.springframework.web.bind.annotation.RestController;
  11. import java.time.LocalDateTime;
  12. /**
  13. * @author Rem
  14. * @date 2021-12-30
  15. */
  16. @Slf4j
  17. @RestController
  18. @RequestMapping("/lazy")
  19. @Api(tags = "惰性队列")
  20. public class LazyController {
  21. @Autowired
  22. private RabbitTemplate rabbitTemplate;
  23. @ApiOperation(value = "发送一条的消息")
  24. @GetMapping("/sendMsg/{message}")
  25. public void sendMsg(@PathVariable String message) {
  26. log.info("当前时间:{},发送消息开始:{}", LocalDateTime.now(), message);
  27. rabbitTemplate.convertAndSend(LazyConfig.LAZY_EXCHANGE, LazyConfig.LAZY_ROUTING_KEY, message);
  28. log.info("当前时间:{},发送消息完毕:{}", LocalDateTime.now(), message);
  29. }
  30. }
  1. /**
  2. * 惰性队列监听
  3. *
  4. * @param message
  5. */
  6. @RabbitListener(queues = LazyConfig.LAZY_QUEUE)
  7. public void receiveLazy(Message message) {
  8. String msg = new String(message.getBody());
  9. log.info("惰性队列 的消息是:{}", msg);
  10. }

十、rabbitMQ集群

10.1 docker 搭建rabbitMQ集群

创建三个子文件夹

  1. mkdir -p /home/rabbitmq
  2. cd /mydata/rabbitmq-cluster
  3. mkdir rabbitmq-1 rabbitmq-2 rabbitmq-3

分别创建三个docker容器

  1. docker run --hostname rabbitmq-1 --name rabbitmq-1 \
  2. -p 15675:15672 \
  3. -p 5675:5672 \
  4. -v /home/rabbitmq/rabbitmq-1:/var/lib/rabbitmq \
  5. -e RABBITMQ_ERLANG_COOKIE='rabbitmq-cookie' \
  6. -id rabbitmq:management
  1. docker run --hostname rabbitmq-2 --name rabbitmq-2 \
  2. -p 15673:15672 \
  3. -p 5673:5672 \
  4. -v /home/rabbitmq/rabbitmq-2:/var/lib/rabbitmq \
  5. -e RABBITMQ_ERLANG_COOKIE='rabbitmq-cookie' \
  6. --link rabbitmq-1:rabbitmq-1 \
  7. -id rabbitmq:management
  1. docker run --hostname rabbitmq-3 --name rabbitmq-3 \
  2. -p 15674:15672 \
  3. -p 5674:5672 \
  4. -v /home/rabbitmq/rabbitmq-3:/var/lib/rabbitmq \
  5. -e RABBITMQ_ERLANG_COOKIE='rabbitmq-cookie' \
  6. --link rabbitmq-1:rabbitmq-1 \
  7. --link rabbitmq-2:rabbitmq-2 \
  8. -id rabbitmq:management

让节点加入集群

  1. docker exec -it rabbitmq-1 /bin/bash
  2. rabbitmqctl stop_app
  3. rabbitmqctl reset
  4. rabbitmqctl start_app
  1. docker exec -it rabbitmq-2 /bin/bash
  2. rabbitmqctl stop_app
  3. rabbitmqctl reset
  4. rabbitmqctl join_cluster --ram rabbit@rabbitmq-1
  5. rabbitmqctl start_app
  1. docker exec -it rabbitmq-1 /bin/bash
  2. rabbitmqctl stop_app
  3. rabbitmqctl reset
  4. rabbitmqctl start_app

访问任一节点 就可以看到集群情况了
image.png

10.2 镜像队列

引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他Broker节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。

自定义一个已rem 开头的 自定义镜像队列

image.png

创建结果
image.png

创建一个已rem开始的队列 在控制台就可以看到备份了一份,如果1号宕机,就会自动在三号再备份一份
image.png
image.png

10.3 高可用负载均衡

HAProxy提供高可用性、负载均衡及基于TCPHTTP应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案,HAProxy实现了一种事件驱动、单一进程模型,此模型支持非常大的井发连接数。

如果前面配置的HAProxy主机突然宕机或者网卡失效,那么虽然RbbitMQ集群没有任何故障但是对于外界的客户端来说所有的连接都会被断开结果将是灾难性的为了确保负载均衡服务的可靠性同样显得十分重要,这里就要引入Keepalived它能够通过自身健康检查、资源接管功能做高可用(双机热备),实现故障转移.

搭建效果图
1234.jpg