1. 死信
所谓死信指的是无法被正常消费的消息。我们把存放这种消息的队列称为死信队列(实际上它只是普通队列),用于路由死信的交换机称为死信交换机(实际上只是普通交换机)。
当消息符合以下的一个条件时,将会称为死信:
- 消息被拒绝,不重新放回队列(使用
basic.reject/basic.nack
方法拒绝,并且方法的参数requeue=false) - 消息的TTL(生存时间)过期
- 队列达到最大长度
2. 应用场景
当消息无法被正常消费或者消息发生异常时,把异常的消息置为死信,分发到死信队列。另外启动死信消费者来消费死信队列中的消息。
3. 实现
3.1 生产者
package main
import (
"github.com/rs/xid"
"github.com/streadway/amqp"
"log"
)
const (
DeadExchange = "deal.letter.exchange"
DeadQueue = "deal.letter.queue"
DeadRoutingKey = "deal"
)
func main() {
// 声明生产者
url := "amqp://admin:admin@localhost:5672//go-mq"
conn, err := amqp.Dial(url)
fatalln(err)
defer conn.Close()
mqChan, err := conn.Channel()
fatalln(err)
defer mqChan.Close()
// 声明队列
_, err = mqChan.QueueDeclare(mq.QueueName,true, false, false, false,
// 为业务队列配置死信
amqp.Table{
"x-message-ttl": 5000, // 消息过期时间,毫秒
"x-dead-letter-exchange": DeadExchange, // 指定死信交换机
"x-dead-letter-routing-key": DeadRoutingKey, // 指定死信routing-key
},
)
fatalln(err)
// 声明交换机
err = mqChan.ExchangeDeclare(mq.Exchange, "direct", true, false, false, false, nil)
fatalln(err)
// 建立Binding(可建立多个绑定关系)
err = mqChan.QueueBind(mq.QueueName, mq.RoutingKey, mq.Exchange, false, nil)
fatalln(err)
// 声明死信队列
_, err = mqChan.QueueDeclare(DeadQueue, true, false, false, false, nil)
fatalln(err)
// 声明死信交换机
err = mqChan.ExchangeDeclare(DeadExchange, amqp.ExchangeDirect, true, false, false, false, nil)
fatalln(err)
// 建立绑定关系
err = mq.Channel.QueueBind(DeadQueue, DeadRoutingKey, DeadExchange, false, nil)
fatalln(err)
pMsg = &rabbitmq.PublicMessage{
StartTime: time.Now(),
Msg: amqp.Publishing{ // 发送的消息,固定有消息体和一些额外的消息头,包中提供了封装对象
MessageId: xid.New().String(), // 消息ID
ContentType: "text/plain", // 消息内容的类型
Body: []byte("hello cyj19"), // 消息内容
},
}
// 4.发送消息
errv := mq.Public(pMsg.Msg)
if errv != nil {
return
}
}
func fatalln(err error) {
if err != nil {
log.Fatalln(err)
}
}
3.2 死信消费者
package main
import (
"github.com/streadway/amqp"
"log"
)
const (
DeadExchange = "deal.letter.exchange"
DeadQueue = "deal.letter.queue"
DeadRoutingKey = "deal"
)
func main() {
// 声明死信消费者
url := "amqp://admin:admin@localhost:5672//go-mq"
conn, err := amqp.Dial(url)
fatalln(err)
defer conn.Close()
mqChan, err := conn.Channel()
fatalln(err)
defer mqChan.Close()
// 声明死信队列
_, err = mqChan.QueueDeclare(DeadQueue, true, false, false, false, nil)
fatalln(err)
// 消费死信
msgChan, err := mqChan.Consume(DeadQueue, "", false, false, false, false, nil)
fatalln(err)
forever := make(chan struct{})
go func() {
for msg := range msgChan {
// TO DO ...
log.Println("接收到死信消息:", msg.Body)
msg.Ack(false)
}
}()
<-forever
}
func fatalln(err error) {
if err != nil {
log.Fatalln(err)
}
}