插件实现延迟队列
1.比对
在上一篇中我们讲到 DLX + TTL 的两种方式实现延迟队列,但都有各自的问题
- 给队列设置 TTL,不能灵活动态配置
- 给消息设置 TTL,导致消息时序问题,已经过期了的消息被阻塞导致不能及时被消费
这里将使用的是一个 RabbitMQ 延迟消息插件 rabbitmq-delayed-message-exchange,目前维护在 RabbitMQ 插件社区中。
我们可以声明 x-delayed-message 类型的 Exchange,消息发送时指定消息头 x-delay 以毫秒为单位将消息进行延迟投递。消息发布在交换机 x-delayed-message 时,消息它不会立即进入对应队列,而是先将消息保存至 Mnesia (一个分布式数据库管理系统(DBMS)) ,然后插件会尝试确认是否过期,再投递到对应绑定的队列之中。
在声明 x-delayed-message 类型的 Exchange 时,需要配置一个参数 “x-delayed-type”
相当于配置此交换机的路由行为,因为 “x-delay -message” 交换机只充当代理。可选参数为之前提到的基本交换机类型 :direct, topic 等
Docker 安装 rabbitmq-delayed-message-exchange 插件
前往下载列表来根据你的 RabbitMQ 版本来安装对应插件版本 rabbitmq-delayed-message-exchange Tags
此处使用的版本为 rabbitmq-delayed-message-exchange v3.8.x
下载好后的文件有:rabbitmq_delayed_message_exchange-3.8.0.ez
注意解压后,要将后面的版本号给去除 rabbitmq_delayed_message_exchange
不然 rabbitmq-plugins enable 启动插件后,会显示
Error: {:plugins_not_found, [:”rabbitmq_delayed_message_exchange-3.8.0”]}
插件导入
RabbitMQ 的插件目录在 docker 容器中的目录为: /plugins ,所以我们要将解压好的插件导入到 docker 容器中。
docker cp 本地文件路径 counterID:/plugins
docker cp ./rabbitmq_delayed_message_exchange 73c64:/plugins。
以交互的形式进入容器启动插件
[root@vdevops /]# docker exec -it 73c64 /bin/bash
bash-5.0# cd /plugins
启动
rabbitmq-plugins enable
bash-5.0# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@73c64cfd012e:
rabbitmq_delayed_message_exchange
Problem reading some plugins: [{"/opt/rabbitmq/plugins/rabbitmq_delayed_message_exchange",
duplicate_plugin}]
Problem reading some plugins: [{"/opt/rabbitmq/plugins/rabbitmq_delayed_message_exchange",
duplicate_plugin}]
Problem reading some plugins: [{"/opt/rabbitmq/plugins/rabbitmq_delayed_message_exchange",
duplicate_plugin}]
The following plugins have been configured:
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@73c64cfd012e...
The following plugins have been enabled:
rabbitmq_delayed_message_exchange
started 1 plugins.
查看插件启动列表: rabbitmq-plugins list
bash-5.0# rabbitmq-plugins list
Listing plugins with pattern ".*" ...
Problem reading some plugins: [{"/opt/rabbitmq/plugins/rabbitmq_delayed_message_exchange",
duplicate_plugin}]
Configured: E = explicitly enabled; e = implicitly enabled
| Status: * = running on rabbit@73c64cfd012e
|/
[ ] rabbitmq_amqp1_0 3.7.17
[ ] rabbitmq_auth_backend_cache 3.7.17
[ ] rabbitmq_auth_backend_http 3.7.17
[ ] rabbitmq_auth_backend_ldap 3.7.17
[ ] rabbitmq_auth_mechanism_ssl 3.7.17
[ ] rabbitmq_consistent_hash_exchange 3.7.17
[E*] rabbitmq_delayed_message_exchange 3.8.0
[ ] rabbitmq_event_exchange 3.7.17
[ ] rabbitmq_federation 3.7.17
[ ] rabbitmq_federation_management 3.7.17
[ ] rabbitmq_jms_topic_exchange 3.7.17
[E*] rabbitmq_management 3.7.17
[e*] rabbitmq_management_agent 3.7.17
[ ] rabbitmq_mqtt 3.7.17
[ ] rabbitmq_peer_discovery_aws 3.7.17
[ ] rabbitmq_peer_discovery_common 3.7.17
[ ] rabbitmq_peer_discovery_consul 3.7.17
[ ] rabbitmq_peer_discovery_etcd 3.7.17
[ ] rabbitmq_peer_discovery_k8s 3.7.17
[ ] rabbitmq_random_exchange 3.7.17
[ ] rabbitmq_recent_history_exchange 3.7.17
[ ] rabbitmq_sharding 3.7.17
[ ] rabbitmq_shovel 3.7.17
[ ] rabbitmq_shovel_management 3.7.17
[ ] rabbitmq_stomp 3.7.17
[ ] rabbitmq_top 3.7.17
[ ] rabbitmq_tracing 3.7.17
[ ] rabbitmq_trust_store 3.7.17
[e*] rabbitmq_web_dispatch 3.7.17
[ ] rabbitmq_web_mqtt 3.7.17
[ ] rabbitmq_web_mqtt_examples 3.7.17
[ ] rabbitmq_web_stomp 3.7.17
[ ] rabbitmq_web_stomp_examples 3.7.17
可以看见 rabbitmq_delayed_message_exchange 已经启动
退出容器,并重启 RabbitMQ 即可
bash-5.0# exit
[root@vdevops /]# docker restart 73c64
重启后,前往 RabbitMQ web 控制台中,在创建交换机的选项中我们能发现多了一项类型:x-delayed-message
3.使用
结构图
定义交换机,队列,及其绑定
/**
* 定义延迟交换机
* 使用 CustomExchange 方式创建自定义的交换机,类型为我们的 - 延迟交换机
*/
@Bean
public CustomExchange xDelayExchange() {
String exchangeName = "x_delay_exchange";
Map<String, Object> args = new HashMap<String, Object>(1);
// 这里使用直连方式的路由,如果想使用不同的路由行为,可以修改,如 topic
args.put("x-delayed-type", "direct");
return new CustomExchange(exchangeName, "x-delayed-message", true, false, args);
}
@Bean
public Queue delayQueueC() {
String queueName = "delay_queue_C";
return new Queue(queueName, true, false, false);
}
@Bean
public Binding bindingDelayExchange() {
String routingKey = "bind.delay.C";
return BindingBuilder.bind(delayQueueC()).to(xDelayExchange()).with(routingKey).noargs();
}
消费者
@RabbitListener(queues = "delay_queue_C")
public void receiverC(@Payload String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
System.out.println("当前时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",延迟队列 - dlx_queue_C 收到消息:" + msg);
channel.basicAck(deliveryTag, false);
}
生产者
@Test
public void demo_delay_plugins() throws Exception {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String exchange = "x_delay_exchange";
String routingKey = "bind.delay.C";
String msg = "我是第一条消息";
Integer delayTime = 6000;
System.out.println("当前时间:" + simpleDateFormat.format(new Date()) + " 开始发送消息:" + msg + " 延迟的时间为:" + delayTime);
rabbitTemplate.convertAndSend(exchange, routingKey, msg, new MyDelayMessagePostProcessor(delayTime));
msg = "我是第二条消息";
delayTime = 3000;
System.out.println("当前时间:" + simpleDateFormat.format(new Date()) + " 开始发送消息:" + msg + " 延迟的时间为:" + delayTime);
rabbitTemplate.convertAndSend(exchange, routingKey, msg, new MyDelayMessagePostProcessor(delayTime));
Thread.sleep(30000L);
}
MyDelayMessagePostProcessor
class MyDelayMessagePostProcessor implements MessagePostProcessor {
// 延迟时间 毫秒
private Integer delayTime;
MyDelayMessagePostProcessor(Integer delayTime) {
this.delayTime = delayTime;
}
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置延迟时间
message.getMessageProperties().setDelay(delayTime);
// 两种方式设置延迟时间同样
// message.getMessageProperties().getHeaders().put("x-delay", delayTime);
return message;
}
}
输出
当前时间:2020-09-28 21:10:20 开始发送消息:我是第一条消息 延迟的时间为:6000
当前时间:2020-09-28 21:10:20 开始发送消息:我是第二条消息 延迟的时间为:3000
当前时间:2020-09-28 21:10:23,延迟队列 - dlx_queue_C 收到消息:我是第一条消息
当前时间:2020-09-28 21:10:26,延迟队列 - dlx_queue_C 收到消息:我是第二条消息
4.局限性
- 由于 “x-delayed-type” 参数,可以使用此交换机来代替其他交换机,因为 “x-delayed-message” 交换机只是充当代理,可能会对性能产生影响,比实际使用的基本交换机要慢。
- 使用 Erlang 的定时器,所以有延时时长:0<=n<=(2^32)-1 ,单位毫秒。
- 目前这个插件的设计并不适合大量延迟消息的情况(例如100条数千条或数百万条)。详见 #72](https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/issues/72)
关闭该插件使用
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
5.总结
DLX + TTL 和 Delayed Message 插件这两种 RabbitMQ 延迟消息解决方案都有一定的局限性。
如果你的消息 TTL 是相同的,使用 DLX + TTL 的这种方式是没问题的,对于我来说目前还是优选。
如果你的消息 TTL 过期值是可变的,可以尝试使用 Delayed Message 插件,对于某些应用而言它可能很好用,对于那些可能会达到高容量延迟消息的应用而言,则不是很友好。