1. 死信
所谓死信指的是无法被正常消费的消息。我们把存放这种消息的队列称为死信队列(实际上它只是普通队列),用于路由死信的交换机称为死信交换机(实际上只是普通交换机)。
当消息符合以下的一个条件时,将会称为死信:
- 消息被拒绝,不重新放回队列(使用
basic.reject/basic.nack方法拒绝,并且方法的参数requeue=false) - 消息的TTL(生存时间)过期
- 队列达到最大长度
2. 应用场景
当消息无法被正常消费或者消息发生异常时,把异常的消息置为死信,分发到死信队列。另外启动死信消费者来消费死信队列中的消息。
3. 实现
3.1 生产者
package mainimport ("github.com/rs/xid""github.com/streadway/amqp""log")const (DeadExchange = "deal.letter.exchange"DeadQueue = "deal.letter.queue"DeadRoutingKey = "deal")func main() {// 声明生产者url := "amqp://admin:admin@localhost:5672//go-mq"conn, err := amqp.Dial(url)fatalln(err)defer conn.Close()mqChan, err := conn.Channel()fatalln(err)defer mqChan.Close()// 声明队列_, err = mqChan.QueueDeclare(mq.QueueName,true, false, false, false,// 为业务队列配置死信amqp.Table{"x-message-ttl": 5000, // 消息过期时间,毫秒"x-dead-letter-exchange": DeadExchange, // 指定死信交换机"x-dead-letter-routing-key": DeadRoutingKey, // 指定死信routing-key},)fatalln(err)// 声明交换机err = mqChan.ExchangeDeclare(mq.Exchange, "direct", true, false, false, false, nil)fatalln(err)// 建立Binding(可建立多个绑定关系)err = mqChan.QueueBind(mq.QueueName, mq.RoutingKey, mq.Exchange, false, nil)fatalln(err)// 声明死信队列_, err = mqChan.QueueDeclare(DeadQueue, true, false, false, false, nil)fatalln(err)// 声明死信交换机err = mqChan.ExchangeDeclare(DeadExchange, amqp.ExchangeDirect, true, false, false, false, nil)fatalln(err)// 建立绑定关系err = mq.Channel.QueueBind(DeadQueue, DeadRoutingKey, DeadExchange, false, nil)fatalln(err)pMsg = &rabbitmq.PublicMessage{StartTime: time.Now(),Msg: amqp.Publishing{ // 发送的消息,固定有消息体和一些额外的消息头,包中提供了封装对象MessageId: xid.New().String(), // 消息IDContentType: "text/plain", // 消息内容的类型Body: []byte("hello cyj19"), // 消息内容},}// 4.发送消息errv := mq.Public(pMsg.Msg)if errv != nil {return}}func fatalln(err error) {if err != nil {log.Fatalln(err)}}
3.2 死信消费者
package mainimport ("github.com/streadway/amqp""log")const (DeadExchange = "deal.letter.exchange"DeadQueue = "deal.letter.queue"DeadRoutingKey = "deal")func main() {// 声明死信消费者url := "amqp://admin:admin@localhost:5672//go-mq"conn, err := amqp.Dial(url)fatalln(err)defer conn.Close()mqChan, err := conn.Channel()fatalln(err)defer mqChan.Close()// 声明死信队列_, err = mqChan.QueueDeclare(DeadQueue, true, false, false, false, nil)fatalln(err)// 消费死信msgChan, err := mqChan.Consume(DeadQueue, "", false, false, false, false, nil)fatalln(err)forever := make(chan struct{})go func() {for msg := range msgChan {// TO DO ...log.Println("接收到死信消息:", msg.Body)msg.Ack(false)}}()<-forever}func fatalln(err error) {if err != nil {log.Fatalln(err)}}
