介绍

:::tips 因为延迟队列的需求非常多,所以RabbitMQ官方推出了DelayExchange插件,原生支持延迟队列效果

使用DelayExchange插件需要将一个交换机声明为delayed类型,当我们发送消息到delayExchange时,流程如下:

  • 接收消息
  • 判断消息是否具备x-delay属性
  • 如果消息中有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值作为延迟时间
  • 返回routing not found结果给消息发送者
  • x-delay时间到期后,重新投递消息到指定队列 :::

    安装

    下载

    :::tips RabbitMQ官方插件社区:社区地址,里面包含各种各样的插件,其中就包括DelayExchange插件:插件地址,对应RabbitMQ 3.8.5以上的版本

下载好之后,将这个文件上传到RabbitMQ容器的插件目录 :::

进入容器

:::tips 上传之后,进入容器内部来安装这个插件 :::

  1. docker exec -it RabbitMQ容器名 bash

开启插件

:::tips 进入容器之后,执行下面的命令来开启插件 :::

  1. rabbitmq-plugins enable rabbitmq_delayed_message_exchange

发送消息

:::tips 消息生产者在发送消息时需要携带x-delay属性,指定消息的延迟时间 :::

  1. @SpringBootTest
  2. public class MyTest{
  3. //注入RabbitTemplate对象
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. @Test
  7. public void test(){
  8. //指定交换机名称
  9. String exchangeName = "exchange.delay";
  10. //指定路由key
  11. String routingKey = "delay";
  12. //创建消息
  13. Message message = MessageBuilder
  14. .withBody("这是一条测试消息".getBytes(StandardCharsets.UTF_8))
  15. .setHeader("x-delay", 10000) //设置消息延迟时间,单位:毫秒
  16. .build();
  17. //发送消息
  18. rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
  19. }
  20. }

接收消息

:::tips 在消息消费者中声明交换机的类型为DelayExchange类型 :::

基于注解方式

  1. //将这个类注册到Spring容器中
  2. @Component
  3. public class RabbitMqListener{
  4. //声明并绑定队列和交换机,同时监听队列中的消息
  5. @RabbitListener(bindings = @QueueBinding(
  6. value = @Queue(name = "delay.queue"),
  7. exchange = @Exchange(name = "exchange.delay", delayed = "true"),
  8. key = "delay"
  9. ))
  10. public void listen(String msg){
  11. System.out.println("接收到消息:" + msg);
  12. }
  13. }

基于@Bean方式

  1. @Configuration
  2. public class XxxConfig{
  3. //声明DelayExchange类型的交换机
  4. @Bean
  5. public DirectExchange delayedExchange(){
  6. return ExchangeBuilder
  7. .directExchange("exchange.delay") //指定交换机的类型和名称
  8. .delayed() //指定delay属性为true
  9. .durable(true) //设置为持久化队列
  10. .build();
  11. }
  12. }