1. 死信

所谓死信指的是无法被正常消费的消息。我们把存放这种消息的队列称为死信队列(实际上它只是普通队列),用于路由死信的交换机称为死信交换机(实际上只是普通交换机)。
当消息符合以下的一个条件时,将会称为死信:

  • 消息被拒绝,不重新放回队列(使用basic.reject/basic.nack方法拒绝,并且方法的参数requeue=false)
  • 消息的TTL(生存时间)过期
  • 队列达到最大长度

2. 应用场景

当消息无法被正常消费或者消息发生异常时,把异常的消息置为死信,分发到死信队列。另外启动死信消费者来消费死信队列中的消息。
deadletter.png

3. 实现

3.1 生产者

  1. package main
  2. import (
  3. "github.com/rs/xid"
  4. "github.com/streadway/amqp"
  5. "log"
  6. )
  7. const (
  8. DeadExchange = "deal.letter.exchange"
  9. DeadQueue = "deal.letter.queue"
  10. DeadRoutingKey = "deal"
  11. )
  12. func main() {
  13. // 声明生产者
  14. url := "amqp://admin:admin@localhost:5672//go-mq"
  15. conn, err := amqp.Dial(url)
  16. fatalln(err)
  17. defer conn.Close()
  18. mqChan, err := conn.Channel()
  19. fatalln(err)
  20. defer mqChan.Close()
  21. // 声明队列
  22. _, err = mqChan.QueueDeclare(mq.QueueName,true, false, false, false,
  23. // 为业务队列配置死信
  24. amqp.Table{
  25. "x-message-ttl": 5000, // 消息过期时间,毫秒
  26. "x-dead-letter-exchange": DeadExchange, // 指定死信交换机
  27. "x-dead-letter-routing-key": DeadRoutingKey, // 指定死信routing-key
  28. },
  29. )
  30. fatalln(err)
  31. // 声明交换机
  32. err = mqChan.ExchangeDeclare(mq.Exchange, "direct", true, false, false, false, nil)
  33. fatalln(err)
  34. // 建立Binding(可建立多个绑定关系)
  35. err = mqChan.QueueBind(mq.QueueName, mq.RoutingKey, mq.Exchange, false, nil)
  36. fatalln(err)
  37. // 声明死信队列
  38. _, err = mqChan.QueueDeclare(DeadQueue, true, false, false, false, nil)
  39. fatalln(err)
  40. // 声明死信交换机
  41. err = mqChan.ExchangeDeclare(DeadExchange, amqp.ExchangeDirect, true, false, false, false, nil)
  42. fatalln(err)
  43. // 建立绑定关系
  44. err = mq.Channel.QueueBind(DeadQueue, DeadRoutingKey, DeadExchange, false, nil)
  45. fatalln(err)
  46. pMsg = &rabbitmq.PublicMessage{
  47. StartTime: time.Now(),
  48. Msg: amqp.Publishing{ // 发送的消息,固定有消息体和一些额外的消息头,包中提供了封装对象
  49. MessageId: xid.New().String(), // 消息ID
  50. ContentType: "text/plain", // 消息内容的类型
  51. Body: []byte("hello cyj19"), // 消息内容
  52. },
  53. }
  54. // 4.发送消息
  55. errv := mq.Public(pMsg.Msg)
  56. if errv != nil {
  57. return
  58. }
  59. }
  60. func fatalln(err error) {
  61. if err != nil {
  62. log.Fatalln(err)
  63. }
  64. }

3.2 死信消费者

  1. package main
  2. import (
  3. "github.com/streadway/amqp"
  4. "log"
  5. )
  6. const (
  7. DeadExchange = "deal.letter.exchange"
  8. DeadQueue = "deal.letter.queue"
  9. DeadRoutingKey = "deal"
  10. )
  11. func main() {
  12. // 声明死信消费者
  13. url := "amqp://admin:admin@localhost:5672//go-mq"
  14. conn, err := amqp.Dial(url)
  15. fatalln(err)
  16. defer conn.Close()
  17. mqChan, err := conn.Channel()
  18. fatalln(err)
  19. defer mqChan.Close()
  20. // 声明死信队列
  21. _, err = mqChan.QueueDeclare(DeadQueue, true, false, false, false, nil)
  22. fatalln(err)
  23. // 消费死信
  24. msgChan, err := mqChan.Consume(DeadQueue, "", false, false, false, false, nil)
  25. fatalln(err)
  26. forever := make(chan struct{})
  27. go func() {
  28. for msg := range msgChan {
  29. // TO DO ...
  30. log.Println("接收到死信消息:", msg.Body)
  31. msg.Ack(false)
  32. }
  33. }()
  34. <-forever
  35. }
  36. func fatalln(err error) {
  37. if err != nil {
  38. log.Fatalln(err)
  39. }
  40. }