介绍
:::tips 因为延迟队列的需求非常多,所以RabbitMQ官方推出了DelayExchange插件,原生支持延迟队列效果
使用DelayExchange插件需要将一个交换机声明为delayed类型,当我们发送消息到delayExchange时,流程如下:
- 接收消息
- 判断消息是否具备x-delay属性
- 如果消息中有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值作为延迟时间
- 返回routing not found结果给消息发送者
- x-delay时间到期后,重新投递消息到指定队列
:::
安装
下载
:::tips RabbitMQ官方插件社区:社区地址,里面包含各种各样的插件,其中就包括DelayExchange插件:插件地址,对应RabbitMQ 3.8.5以上的版本
下载好之后,将这个文件上传到RabbitMQ容器的插件目录 :::
进入容器
:::tips 上传之后,进入容器内部来安装这个插件 :::
docker exec -it RabbitMQ容器名 bash
开启插件
:::tips 进入容器之后,执行下面的命令来开启插件 :::
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
发送消息
:::tips 消息生产者在发送消息时需要携带x-delay属性,指定消息的延迟时间 :::
@SpringBootTest
public class MyTest{
//注入RabbitTemplate对象
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test(){
//指定交换机名称
String exchangeName = "exchange.delay";
//指定路由key
String routingKey = "delay";
//创建消息
Message message = MessageBuilder
.withBody("这是一条测试消息".getBytes(StandardCharsets.UTF_8))
.setHeader("x-delay", 10000) //设置消息延迟时间,单位:毫秒
.build();
//发送消息
rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
}
}
接收消息
:::tips 在消息消费者中声明交换机的类型为DelayExchange类型 :::
基于注解方式
//将这个类注册到Spring容器中
@Component
public class RabbitMqListener{
//声明并绑定队列和交换机,同时监听队列中的消息
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue"),
exchange = @Exchange(name = "exchange.delay", delayed = "true"),
key = "delay"
))
public void listen(String msg){
System.out.println("接收到消息:" + msg);
}
}
基于@Bean方式
@Configuration
public class XxxConfig{
//声明DelayExchange类型的交换机
@Bean
public DirectExchange delayedExchange(){
return ExchangeBuilder
.directExchange("exchange.delay") //指定交换机的类型和名称
.delayed() //指定delay属性为true
.durable(true) //设置为持久化队列
.build();
}
}