消息延迟原理
使用前必须要完成的设置
1.必须安装消息队列延迟插件
# 插件安装详细参考地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
# 主要步骤
1.下载插件,选择的插件版本与 rabbitmq 版本保持一致,下载xxx.ez 文件
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
2.将插件复制到 rabbitmq 插件安装目录
// 如果是docker启动的容器,容器根目录有个 /plugins 软连接,将插件复制在此处
// 如果是其他方式安装的 rabbitmq,插件目录请自行查找
3.安装插件(以docker容器为例)
docker exec -it 你的 rabbitmq 容器名称 rabbitmq-plugins enable rabbitmq_management
4.查看已经安装的插件
docker exec -it rabbitmq rabbitmq-plugins list
插件安装完成如下:
2.延迟消息队列为了消息在指定时间之后,能够送达到消费者端(客户端), 消息会被持久化存储。
3.延迟消息队列的使用与即时消息队列的使用基本一致,只不过创建对象时,需要多传递一个参数。
4.延迟消息队列依赖的交换机必须手动在服务端创建,我们以 publish/subscribe
模式为例截图说明
延迟消息队列特点:
发送消息时可以指定多少毫秒之后发送,特别适合去处理未来某个时间点的业务,例如:订单在指定时间内没有付款,则自动关闭;指定在某个节日(从发送时计算,多少秒之后)统一向客户发送一些优惠活动信息等。
延迟消息队列支持的模式:
publish/subscribe
、routing
、topics
rabbitmq 消息队列的模式划分的很细致,但是总结起来其实就是2种:
1.publish/subscribe
模式可以代替 hello_world
、 work_queue
模式。
2.topics
模式也可以代替 routing
模式,但是 routing
模式也非常灵活,因此也被广泛使用。
publish/subscribe
模式
创建生产者、消费者(客户端)时相比即时发送的消息队列,多传递一个参数即可。
1 生产者代码
// 3.PublishSubscribe 发布、订阅模式模式
func TestRabbitMqPublishSubscribeProducer(t *testing.T) {
// 创建客户端时传递参数:publish_subscribe.SetProdMsgDelayParams(true) 即可
producer, err := publish_subscribe.CreateProducer(publish_subscribe.SetProdMsgDelayParams(true))
if err != nil {
t.Errorf("WorkQueue 单元测试未通过。%s\n", err.Error())
os.Exit(1)
}
var res bool
for i := 0; i < 10; i++ {
str := fmt.Sprintf("%d_PublishSubscribe开始发送消息测试", i+1)
// 参数解释:
// 参数一: 需要发送的消息
// 参数二:延迟毫秒,只有延迟模式才有效。
res = producer.Send(str, 10000)
//time.Sleep(time.Second * 2)
}
producer.Close() // 消息投递结束,必须关闭连接
if res {
fmt.Printf("消息发送OK")
} else {
t.Errorf("PublishSubscribe 模式消息发送失败")
}
}
2 消费者端代码
创建消费者端时传递参数:(publish_subscribe.SetConsMsgDelayParams(true) 即可
func TestRabbitMqPublishSubscribeConsumer(t *testing.T) {
// 创建消费者端时传递参数:(publish_subscribe.SetConsMsgDelayParams(true) 即可
consumer, err := publish_subscribe.CreateConsumer(publish_subscribe.SetConsMsgDelayParams(true))
if err != nil {
t.Errorf("PublishSubscribe单元测试未通过。%s\n", err.Error())
os.Exit(1)
}
consumer.OnConnectionError(func(err *amqp.Error) {
log.Fatal(my_errors.ErrorsRabbitMqReconnectFail + "\n" + err.Error())
})
consumer.Received(func(receivedData string) {
fmt.Printf("PublishSubscribe回调函数处理消息:--->%s\n", receivedData)
})
}
参数清单一览
消息队列模式 | 生产者对象创建时的参数 | 消费者对象创建时的参数 |
---|---|---|
publish/subscribe | publish_subscribe.SetProdMsgDelayParams(true) | publish_subscribe.SetConsMsgDelayParams(true) |
Routing | routing.SetProdMsgDelayParams(true) | routing.SetConsMsgDelayParams(true) |
topics | topics.SetProdMsgDelayParams(true) | topics.SetConsMsgDelayParams(true) |
避坑指南
1.生产者创建的是延迟消息队列模式时,消费者也必须使用延迟消息队列模式的客户端处理。
2.延迟消息队列之所以能延迟意味着消息会被持久化,性能没有即时模式高,因此如果业务没有必要使用延迟消息队列模式,请不要将消息发送的延迟时间设置为 0 毫秒,使用延迟消息队列模式发送即时消息。
3.必须保证开启了消息延迟插件。
4.必须保证延迟交换机(参见前文截图)手动在服务端创建,否则可能报错:
Exception (504) Reason: "channel/connection is not open"