消息延迟原理
使用前必须要完成的设置
1.必须安装消息队列延迟插件
# 插件安装详细参考地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange# 主要步骤1.下载插件,选择的插件版本与 rabbitmq 版本保持一致,下载xxx.ez 文件https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases2.将插件复制到 rabbitmq 插件安装目录// 如果是docker启动的容器,容器根目录有个 /plugins 软连接,将插件复制在此处// 如果是其他方式安装的 rabbitmq,插件目录请自行查找3.安装插件(以docker容器为例)docker exec -it 你的 rabbitmq 容器名称 rabbitmq-plugins enable rabbitmq_management4.查看已经安装的插件docker exec -it rabbitmq rabbitmq-plugins list
插件安装完成如下:
2.延迟消息队列为了消息在指定时间之后,能够送达到消费者端(客户端), 消息会被持久化存储。
3.延迟消息队列的使用与即时消息队列的使用基本一致,只不过创建对象时,需要多传递一个参数。
4.延迟消息队列依赖的交换机必须手动在服务端创建,我们以 publish/subscribe 模式为例截图说明
延迟消息队列特点:
发送消息时可以指定多少毫秒之后发送,特别适合去处理未来某个时间点的业务,例如:订单在指定时间内没有付款,则自动关闭;指定在某个节日(从发送时计算,多少秒之后)统一向客户发送一些优惠活动信息等。
延迟消息队列支持的模式:
publish/subscribe 、routing 、topics
rabbitmq 消息队列的模式划分的很细致,但是总结起来其实就是2种:
1.publish/subscribe 模式可以代替 hello_world、 work_queue模式。
2.topics 模式也可以代替 routing模式,但是 routing模式也非常灵活,因此也被广泛使用。
publish/subscribe模式
创建生产者、消费者(客户端)时相比即时发送的消息队列,多传递一个参数即可。
1 生产者代码
// 3.PublishSubscribe 发布、订阅模式模式func TestRabbitMqPublishSubscribeProducer(t *testing.T) {// 创建客户端时传递参数:publish_subscribe.SetProdMsgDelayParams(true) 即可producer, err := publish_subscribe.CreateProducer(publish_subscribe.SetProdMsgDelayParams(true))if err != nil {t.Errorf("WorkQueue 单元测试未通过。%s\n", err.Error())os.Exit(1)}var res boolfor i := 0; i < 10; i++ {str := fmt.Sprintf("%d_PublishSubscribe开始发送消息测试", i+1)// 参数解释:// 参数一: 需要发送的消息// 参数二:延迟毫秒,只有延迟模式才有效。res = producer.Send(str, 10000)//time.Sleep(time.Second * 2)}producer.Close() // 消息投递结束,必须关闭连接if res {fmt.Printf("消息发送OK")} else {t.Errorf("PublishSubscribe 模式消息发送失败")}}
2 消费者端代码
创建消费者端时传递参数:(publish_subscribe.SetConsMsgDelayParams(true) 即可
func TestRabbitMqPublishSubscribeConsumer(t *testing.T) {// 创建消费者端时传递参数:(publish_subscribe.SetConsMsgDelayParams(true) 即可consumer, err := publish_subscribe.CreateConsumer(publish_subscribe.SetConsMsgDelayParams(true))if err != nil {t.Errorf("PublishSubscribe单元测试未通过。%s\n", err.Error())os.Exit(1)}consumer.OnConnectionError(func(err *amqp.Error) {log.Fatal(my_errors.ErrorsRabbitMqReconnectFail + "\n" + err.Error())})consumer.Received(func(receivedData string) {fmt.Printf("PublishSubscribe回调函数处理消息:--->%s\n", receivedData)})}
参数清单一览
| 消息队列模式 | 生产者对象创建时的参数 | 消费者对象创建时的参数 |
|---|---|---|
| publish/subscribe | publish_subscribe.SetProdMsgDelayParams(true) | publish_subscribe.SetConsMsgDelayParams(true) |
| Routing | routing.SetProdMsgDelayParams(true) | routing.SetConsMsgDelayParams(true) |
| topics | topics.SetProdMsgDelayParams(true) | topics.SetConsMsgDelayParams(true) |
避坑指南
1.生产者创建的是延迟消息队列模式时,消费者也必须使用延迟消息队列模式的客户端处理。
2.延迟消息队列之所以能延迟意味着消息会被持久化,性能没有即时模式高,因此如果业务没有必要使用延迟消息队列模式,请不要将消息发送的延迟时间设置为 0 毫秒,使用延迟消息队列模式发送即时消息。
3.必须保证开启了消息延迟插件。
4.必须保证延迟交换机(参见前文截图)手动在服务端创建,否则可能报错:
Exception (504) Reason: "channel/connection is not open"
