消息延迟原理

延迟原理.png

使用前必须要完成的设置

1.必须安装消息队列延迟插件

  1. # 插件安装详细参考地址:
  2. https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
  3. # 主要步骤
  4. 1.下载插件,选择的插件版本与 rabbitmq 版本保持一致,下载xxx.ez 文件
  5. https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
  6. 2.将插件复制到 rabbitmq 插件安装目录
  7. // 如果是docker启动的容器,容器根目录有个 /plugins 软连接,将插件复制在此处
  8. // 如果是其他方式安装的 rabbitmq,插件目录请自行查找
  9. 3.安装插件(以docker容器为例)
  10. docker exec -it 你的 rabbitmq 容器名称 rabbitmq-plugins enable rabbitmq_management
  11. 4.查看已经安装的插件
  12. docker exec -it rabbitmq rabbitmq-plugins list

插件安装完成如下:
delay_plugin.png
2.延迟消息队列为了消息在指定时间之后,能够送达到消费者端(客户端), 消息会被持久化存储。
3.延迟消息队列的使用与即时消息队列的使用基本一致,只不过创建对象时,需要多传递一个参数。
4.延迟消息队列依赖的交换机必须手动在服务端创建,我们以 publish/subscribe 模式为例截图说明
delay_1.png
delay_2.png

延迟消息队列特点:

发送消息时可以指定多少毫秒之后发送,特别适合去处理未来某个时间点的业务,例如:订单在指定时间内没有付款,则自动关闭;指定在某个节日(从发送时计算,多少秒之后)统一向客户发送一些优惠活动信息等。

延迟消息队列支持的模式:

publish/subscriberoutingtopics
rabbitmq 消息队列的模式划分的很细致,但是总结起来其实就是2种:
1.publish/subscribe 模式可以代替 hello_worldwork_queue模式。
2.topics 模式也可以代替 routing模式,但是 routing模式也非常灵活,因此也被广泛使用。

publish/subscribe模式

创建生产者、消费者(客户端)时相比即时发送的消息队列,多传递一个参数即可。

1 生产者代码

  1. // 3.PublishSubscribe 发布、订阅模式模式
  2. func TestRabbitMqPublishSubscribeProducer(t *testing.T) {
  3. // 创建客户端时传递参数:publish_subscribe.SetProdMsgDelayParams(true) 即可
  4. producer, err := publish_subscribe.CreateProducer(publish_subscribe.SetProdMsgDelayParams(true))
  5. if err != nil {
  6. t.Errorf("WorkQueue 单元测试未通过。%s\n", err.Error())
  7. os.Exit(1)
  8. }
  9. var res bool
  10. for i := 0; i < 10; i++ {
  11. str := fmt.Sprintf("%d_PublishSubscribe开始发送消息测试", i+1)
  12. // 参数解释:
  13. // 参数一: 需要发送的消息
  14. // 参数二:延迟毫秒,只有延迟模式才有效。
  15. res = producer.Send(str, 10000)
  16. //time.Sleep(time.Second * 2)
  17. }
  18. producer.Close() // 消息投递结束,必须关闭连接
  19. if res {
  20. fmt.Printf("消息发送OK")
  21. } else {
  22. t.Errorf("PublishSubscribe 模式消息发送失败")
  23. }
  24. }

2 消费者端代码

创建消费者端时传递参数:(publish_subscribe.SetConsMsgDelayParams(true) 即可

  1. func TestRabbitMqPublishSubscribeConsumer(t *testing.T) {
  2. // 创建消费者端时传递参数:(publish_subscribe.SetConsMsgDelayParams(true) 即可
  3. consumer, err := publish_subscribe.CreateConsumer(publish_subscribe.SetConsMsgDelayParams(true))
  4. if err != nil {
  5. t.Errorf("PublishSubscribe单元测试未通过。%s\n", err.Error())
  6. os.Exit(1)
  7. }
  8. consumer.OnConnectionError(func(err *amqp.Error) {
  9. log.Fatal(my_errors.ErrorsRabbitMqReconnectFail + "\n" + err.Error())
  10. })
  11. consumer.Received(func(receivedData string) {
  12. fmt.Printf("PublishSubscribe回调函数处理消息:--->%s\n", receivedData)
  13. })
  14. }

参数清单一览

消息队列模式 生产者对象创建时的参数 消费者对象创建时的参数
publish/subscribe publish_subscribe.SetProdMsgDelayParams(true) publish_subscribe.SetConsMsgDelayParams(true)
Routing routing.SetProdMsgDelayParams(true) routing.SetConsMsgDelayParams(true)
topics topics.SetProdMsgDelayParams(true) topics.SetConsMsgDelayParams(true)

避坑指南

1.生产者创建的是延迟消息队列模式时,消费者也必须使用延迟消息队列模式的客户端处理。
2.延迟消息队列之所以能延迟意味着消息会被持久化,性能没有即时模式高,因此如果业务没有必要使用延迟消息队列模式,请不要将消息发送的延迟时间设置为 0 毫秒,使用延迟消息队列模式发送即时消息。
3.必须保证开启了消息延迟插件。
4.必须保证延迟交换机(参见前文截图)手动在服务端创建,否则可能报错:

  1. Exception (504) Reason: "channel/connection is not open"