插件实现延迟队列

1.比对

在上一篇中我们讲到 DLX + TTL 的两种方式实现延迟队列,但都有各自的问题

  1. 给队列设置 TTL,不能灵活动态配置
  2. 给消息设置 TTL,导致消息时序问题,已经过期了的消息被阻塞导致不能及时被消费

这里将使用的是一个 RabbitMQ 延迟消息插件 rabbitmq-delayed-message-exchange,目前维护在 RabbitMQ 插件社区中。
我们可以声明 x-delayed-message 类型的 Exchange,消息发送时指定消息头 x-delay 以毫秒为单位将消息进行延迟投递。消息发布在交换机 x-delayed-message 时,消息它不会立即进入对应队列,而是先将消息保存至 Mnesia (一个分布式数据库管理系统(DBMS)) ,然后插件会尝试确认是否过期,再投递到对应绑定的队列之中。
在声明 x-delayed-message 类型的 Exchange 时,需要配置一个参数 “x-delayed-type”
相当于配置此交换机的路由行为,因为 “x-delay -message” 交换机只充当代理。可选参数为之前提到的基本交换机类型 :direct, topic 等

Docker 安装 rabbitmq-delayed-message-exchange 插件

前往下载列表来根据你的 RabbitMQ 版本来安装对应插件版本 rabbitmq-delayed-message-exchange Tags
此处使用的版本为 rabbitmq-delayed-message-exchange v3.8.x
下载好后的文件有:rabbitmq_delayed_message_exchange-3.8.0.ez
注意解压后,要将后面的版本号给去除 rabbitmq_delayed_message_exchange
不然 rabbitmq-plugins enable 启动插件后,会显示
Error: {:plugins_not_found, [:”rabbitmq_delayed_message_exchange-3.8.0”]}

插件导入

RabbitMQ 的插件目录在 docker 容器中的目录为: /plugins ,所以我们要将解压好的插件导入到 docker 容器中。
docker cp 本地文件路径 counterID:/plugins
docker cp ./rabbitmq_delayed_message_exchange 73c64:/plugins。

以交互的形式进入容器启动插件

  1. [root@vdevops /]# docker exec -it 73c64 /bin/bash
  2. bash-5.0# cd /plugins

可以看到在根目录下的 plugins 中找到我们的插件。

启动

rabbitmq-plugins enable

  1. bash-5.0# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  2. Enabling plugins on node rabbit@73c64cfd012e:
  3. rabbitmq_delayed_message_exchange
  4. Problem reading some plugins: [{"/opt/rabbitmq/plugins/rabbitmq_delayed_message_exchange",
  5. duplicate_plugin}]
  6. Problem reading some plugins: [{"/opt/rabbitmq/plugins/rabbitmq_delayed_message_exchange",
  7. duplicate_plugin}]
  8. Problem reading some plugins: [{"/opt/rabbitmq/plugins/rabbitmq_delayed_message_exchange",
  9. duplicate_plugin}]
  10. The following plugins have been configured:
  11. rabbitmq_delayed_message_exchange
  12. rabbitmq_management
  13. rabbitmq_management_agent
  14. rabbitmq_web_dispatch
  15. Applying plugin configuration to rabbit@73c64cfd012e...
  16. The following plugins have been enabled:
  17. rabbitmq_delayed_message_exchange
  18. started 1 plugins.

查看插件启动列表: rabbitmq-plugins list

  1. bash-5.0# rabbitmq-plugins list
  2. Listing plugins with pattern ".*" ...
  3. Problem reading some plugins: [{"/opt/rabbitmq/plugins/rabbitmq_delayed_message_exchange",
  4. duplicate_plugin}]
  5. Configured: E = explicitly enabled; e = implicitly enabled
  6. | Status: * = running on rabbit@73c64cfd012e
  7. |/
  8. [ ] rabbitmq_amqp1_0 3.7.17
  9. [ ] rabbitmq_auth_backend_cache 3.7.17
  10. [ ] rabbitmq_auth_backend_http 3.7.17
  11. [ ] rabbitmq_auth_backend_ldap 3.7.17
  12. [ ] rabbitmq_auth_mechanism_ssl 3.7.17
  13. [ ] rabbitmq_consistent_hash_exchange 3.7.17
  14. [E*] rabbitmq_delayed_message_exchange 3.8.0
  15. [ ] rabbitmq_event_exchange 3.7.17
  16. [ ] rabbitmq_federation 3.7.17
  17. [ ] rabbitmq_federation_management 3.7.17
  18. [ ] rabbitmq_jms_topic_exchange 3.7.17
  19. [E*] rabbitmq_management 3.7.17
  20. [e*] rabbitmq_management_agent 3.7.17
  21. [ ] rabbitmq_mqtt 3.7.17
  22. [ ] rabbitmq_peer_discovery_aws 3.7.17
  23. [ ] rabbitmq_peer_discovery_common 3.7.17
  24. [ ] rabbitmq_peer_discovery_consul 3.7.17
  25. [ ] rabbitmq_peer_discovery_etcd 3.7.17
  26. [ ] rabbitmq_peer_discovery_k8s 3.7.17
  27. [ ] rabbitmq_random_exchange 3.7.17
  28. [ ] rabbitmq_recent_history_exchange 3.7.17
  29. [ ] rabbitmq_sharding 3.7.17
  30. [ ] rabbitmq_shovel 3.7.17
  31. [ ] rabbitmq_shovel_management 3.7.17
  32. [ ] rabbitmq_stomp 3.7.17
  33. [ ] rabbitmq_top 3.7.17
  34. [ ] rabbitmq_tracing 3.7.17
  35. [ ] rabbitmq_trust_store 3.7.17
  36. [e*] rabbitmq_web_dispatch 3.7.17
  37. [ ] rabbitmq_web_mqtt 3.7.17
  38. [ ] rabbitmq_web_mqtt_examples 3.7.17
  39. [ ] rabbitmq_web_stomp 3.7.17
  40. [ ] rabbitmq_web_stomp_examples 3.7.17

可以看见 rabbitmq_delayed_message_exchange 已经启动
退出容器,并重启 RabbitMQ 即可

  1. bash-5.0# exit
  2. [root@vdevops /]# docker restart 73c64

重启后,前往 RabbitMQ web 控制台中,在创建交换机的选项中我们能发现多了一项类型:x-delayed-message

RabbitMQ使用插件方式实现延迟队列 - 图1web 控制台
安装插件成功。

3.使用

结构图

RabbitMQ使用插件方式实现延迟队列 - 图2

定义交换机,队列,及其绑定

  1. /**
  2. * 定义延迟交换机
  3. * 使用 CustomExchange 方式创建自定义的交换机,类型为我们的 - 延迟交换机
  4. */
  5. @Bean
  6. public CustomExchange xDelayExchange() {
  7. String exchangeName = "x_delay_exchange";
  8. Map<String, Object> args = new HashMap<String, Object>(1);
  9. // 这里使用直连方式的路由,如果想使用不同的路由行为,可以修改,如 topic
  10. args.put("x-delayed-type", "direct");
  11. return new CustomExchange(exchangeName, "x-delayed-message", true, false, args);
  12. }
  13. @Bean
  14. public Queue delayQueueC() {
  15. String queueName = "delay_queue_C";
  16. return new Queue(queueName, true, false, false);
  17. }
  18. @Bean
  19. public Binding bindingDelayExchange() {
  20. String routingKey = "bind.delay.C";
  21. return BindingBuilder.bind(delayQueueC()).to(xDelayExchange()).with(routingKey).noargs();
  22. }

消费者

  1. @RabbitListener(queues = "delay_queue_C")
  2. public void receiverC(@Payload String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
  3. System.out.println("当前时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",延迟队列 - dlx_queue_C 收到消息:" + msg);
  4. channel.basicAck(deliveryTag, false);
  5. }

生产者

  1. @Test
  2. public void demo_delay_plugins() throws Exception {
  3. SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  4. String exchange = "x_delay_exchange";
  5. String routingKey = "bind.delay.C";
  6. String msg = "我是第一条消息";
  7. Integer delayTime = 6000;
  8. System.out.println("当前时间:" + simpleDateFormat.format(new Date()) + " 开始发送消息:" + msg + " 延迟的时间为:" + delayTime);
  9. rabbitTemplate.convertAndSend(exchange, routingKey, msg, new MyDelayMessagePostProcessor(delayTime));
  10. msg = "我是第二条消息";
  11. delayTime = 3000;
  12. System.out.println("当前时间:" + simpleDateFormat.format(new Date()) + " 开始发送消息:" + msg + " 延迟的时间为:" + delayTime);
  13. rabbitTemplate.convertAndSend(exchange, routingKey, msg, new MyDelayMessagePostProcessor(delayTime));
  14. Thread.sleep(30000L);
  15. }

MyDelayMessagePostProcessor

  1. class MyDelayMessagePostProcessor implements MessagePostProcessor {
  2. // 延迟时间 毫秒
  3. private Integer delayTime;
  4. MyDelayMessagePostProcessor(Integer delayTime) {
  5. this.delayTime = delayTime;
  6. }
  7. @Override
  8. public Message postProcessMessage(Message message) throws AmqpException {
  9. // 设置延迟时间
  10. message.getMessageProperties().setDelay(delayTime);
  11. // 两种方式设置延迟时间同样
  12. // message.getMessageProperties().getHeaders().put("x-delay", delayTime);
  13. return message;
  14. }
  15. }

输出

  1. 当前时间:2020-09-28 21:10:20 开始发送消息:我是第一条消息 延迟的时间为:6000
  2. 当前时间:2020-09-28 21:10:20 开始发送消息:我是第二条消息 延迟的时间为:3000
  3. 当前时间:2020-09-28 21:10:23,延迟队列 - dlx_queue_C 收到消息:我是第一条消息
  4. 当前时间:2020-09-28 21:10:26,延迟队列 - dlx_queue_C 收到消息:我是第二条消息

可以发现使用延迟队列插件解决了消息阻塞问题。

4.局限性

  1. 由于 “x-delayed-type” 参数,可以使用此交换机来代替其他交换机,因为 “x-delayed-message” 交换机只是充当代理,可能会对性能产生影响,比实际使用的基本交换机要慢。
  2. 使用 Erlang 的定时器,所以有延时时长:0<=n<=(2^32)-1 ,单位毫秒。
  3. 目前这个插件的设计并不适合大量延迟消息的情况(例如100条数千条或数百万条)。详见 #72](https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/issues/72)

关闭该插件使用

  1. rabbitmq-plugins disable rabbitmq_delayed_message_exchange

5.总结

DLX + TTLDelayed Message 插件这两种 RabbitMQ 延迟消息解决方案都有一定的局限性。
如果你的消息 TTL 是相同的,使用 DLX + TTL 的这种方式是没问题的,对于我来说目前还是优选。
如果你的消息 TTL 过期值是可变的,可以尝试使用 Delayed Message 插件,对于某些应用而言它可能很好用,对于那些可能会达到高容量延迟消息的应用而言,则不是很友好。