踩坑:mqurl=amqp://username:password@host:port/vhost,创建的vhost名称是什么就写什么,不管vhost自身有没有带/

1. amqp

go get github.com/streadway/amqp

2. 自定义结构体

  1. /**
  2. * @Author: cyj19
  3. * @Date: 2022/5/17 10:49
  4. */
  5. package rabbitmq
  6. import (
  7. "context"
  8. "github.com/streadway/amqp"
  9. )
  10. type RabbitMQ struct {
  11. Ctx context.Context
  12. Conn *amqp.Connection // 连接
  13. Channel *amqp.Channel // 管道
  14. QueueName string // 队列名称
  15. Exchange string // 交换机
  16. RoutingKey string // 路由key
  17. MQUrl string // mq连接
  18. }
  19. func NewRabbitMQ(ctx context.Context, url, queueName, exchange, key string) (*RabbitMQ, error) {
  20. var err error
  21. r := &RabbitMQ{
  22. Ctx: ctx,
  23. QueueName: queueName,
  24. Exchange: exchange,
  25. RoutingKey: key,
  26. MQUrl: url,
  27. }
  28. r.Conn, err = amqp.Dial(url)
  29. if err != nil {
  30. return nil, err
  31. }
  32. r.Channel, err = r.Conn.Channel()
  33. if err != nil {
  34. return nil, err
  35. }
  36. return r, nil
  37. }
  38. // Close 释放资源
  39. func (mq *RabbitMQ) Close() {
  40. mq.Conn.Close()
  41. mq.Channel.Close()
  42. }
  43. func (mq RabbitMQ) InitQueue() error {
  44. // 声明队列
  45. _, err := mq.Channel.QueueDeclare(
  46. mq.QueueName,
  47. true, // 是否持久化
  48. false, // 是否自动删除(前提是至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。注意:生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列)
  49. false, // 是否为排他队列(排他的队列仅对“首次”声明的conn可见[一个conn中的其他channel也能访问该队列],conn结束后队列删除)
  50. false, // 是否等待服务器确认,为 true 时,无需等待服务器的确认
  51. nil, //额外属性
  52. )
  53. return err
  54. }
  55. func (mq RabbitMQ) InitExchange() error {
  56. // 声明交换机
  57. err := mq.Channel.ExchangeDeclare(
  58. mq.Exchange,
  59. "topic", //exchange type:一般用fanout、direct、topic
  60. true, // 是否持久化
  61. false, //是否自动删除(自动删除的前提是至少有一个队列或者交换器与这和交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑)
  62. false, //设置是否内置的。true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式
  63. false, // 是否等待服务器确认,为 true 时,无需等待服务器的确认
  64. nil, // 额外属性
  65. )
  66. return err
  67. }
  68. func (mq *RabbitMQ) InitBind() error {
  69. // 建立Binding(可建立多个绑定关系)
  70. err := mq.Channel.QueueBind(
  71. mq.QueueName, // 绑定的队列名称
  72. mq.RoutingKey, // 用于消息路由分发的key
  73. mq.Exchange, // 绑定的exchange名
  74. false, // 是否等待服务器确认,为 true 时,无需等待服务器的确认
  75. nil, // 额外属性
  76. )
  77. return err
  78. }
  79. func (mq RabbitMQ) Public(msg amqp.Publishing) error {
  80. err := mq.Channel.Publish(mq.Exchange, mq.RoutingKey, false, false, msg)
  81. return err
  82. }
  1. /**
  2. * @Author: cyj19
  3. * @Date: 2022/5/18 17:11
  4. */
  5. package rabbitmq
  6. import (
  7. "github.com/streadway/amqp"
  8. "time"
  9. )
  10. type PublicMessage struct {
  11. StartTime time.Time
  12. Msg amqp.Publishing
  13. }

3. 生产者

  1. /**
  2. * @Author: cyj19
  3. * @Date: 2022/5/17 11:35
  4. */
  5. package main
  6. import (
  7. "context"
  8. "day20220517-02/pkg/rabbitmq"
  9. "github.com/rs/xid"
  10. "github.com/streadway/amqp"
  11. "log"
  12. "time"
  13. )
  14. func main() {
  15. // 声明生成者
  16. url := "amqp://admin:admin@localhost:5672//go-mq"
  17. mq, err := rabbitmq.NewRabbitMQ(context.Background(), url, "go-queue", "go-exchange", "go-key")
  18. if err != nil {
  19. log.Fatalln(err)
  20. }
  21. defer mq.Close()
  22. err = mq.InitQueue()
  23. if err != nil {
  24. log.Fatalln(err)
  25. }
  26. err = mq.InitExchange()
  27. if err != nil {
  28. log.Fatalln(err)
  29. }
  30. err = mq.InitBind()
  31. if err != nil {
  32. log.Fatalln(err)
  33. }
  34. err = mq.Channel.Confirm(false)
  35. if err != nil {
  36. log.Fatalln(err)
  37. }
  38. confirm := mq.Channel.NotifyPublish(make(chan amqp.Confirmation))
  39. ticker := time.NewTicker(20 * time.Second)
  40. defer func() {
  41. ticker.Stop()
  42. }()
  43. deliveryMap := make(map[uint64]*rabbitmq.PublicMessage)
  44. var deliveryTag uint64 = 1
  45. var pMsg *rabbitmq.PublicMessage
  46. go func() {
  47. for i := 0; i < 10; i++ {
  48. pMsg = &rabbitmq.PublicMessage{
  49. StartTime: time.Now(),
  50. Msg: amqp.Publishing{ // 发送的消息,固定有消息体和一些额外的消息头,包中提供了封装对象
  51. MessageId: xid.New().String(), // 消息ID
  52. ContentType: "text/plain", // 消息内容的类型
  53. Body: []byte("hello cyj19"), // 消息内容
  54. },
  55. }
  56. // 4.发送消息
  57. errv := mq.Public(pMsg.Msg)
  58. if errv != nil {
  59. return
  60. }
  61. deliveryMap[deliveryTag] = pMsg
  62. deliveryTag++
  63. }
  64. }()
  65. for {
  66. select {
  67. case <-mq.Ctx.Done():
  68. return
  69. case c, ok := <-confirm:
  70. if !ok {
  71. log.Println("[RABBITMQ_CLIENT]", "client Publish notify channel error")
  72. return
  73. }
  74. log.Println("RabbitMQ ack")
  75. pMsg = deliveryMap[c.DeliveryTag]
  76. // fmt.Println("DeliveryTag:", c.DeliveryTag)
  77. delete(deliveryMap, c.DeliveryTag)
  78. case <-ticker.C:
  79. now := time.Now()
  80. // 遍历消息表,
  81. for key := range deliveryMap {
  82. pMsg = deliveryMap[key]
  83. if pMsg != nil {
  84. // 消息超时还没收到ack,重发一次消息
  85. if now.Sub(pMsg.StartTime.Add(10*time.Second)) > 0 {
  86. delete(deliveryMap, key)
  87. mq.Public(pMsg.Msg)
  88. }
  89. }
  90. }
  91. }
  92. }
  93. }

4. 消费者

  1. package main
  2. func main(){
  3. // 1. 初始化MQ
  4. ctx := context.Background()
  5. url := "amqp://username:password@127.0.0.1:5672/hello-mq"
  6. mq, err := rabbit.NewRabbitMQ(ctx, url, "hello-queue", "hello-exchange", "hello-key")
  7. if err != nil {
  8. log.Fatalln(err)
  9. }
  10. defer mq.Close()
  11. err = mq.InitQueue()
  12. if err != nil {
  13. log.Fatalln(err)
  14. }
  15. // 3.从队列获取消息(消费者只关注队列)consume方式会不断的从队列中获取消息
  16. msgChan, err := mq.Channel.Consume(
  17. mq.QueueName, // 队列名
  18. "", // 消费者名,用来区分多个消费者,以实现公平分发或均等分发策略
  19. false, // 是否自动应答
  20. false, // 是否排他
  21. false, // 是否接收只同一个连接中的消息,若为true,则只能接收一个conn中发送的消息
  22. false, // 队列消费是否阻塞
  23. nil, // 额外属性
  24. )
  25. if err != nil {
  26. log.Println("获取消息失败", err)
  27. return
  28. }
  29. // 优雅退出
  30. quit := make(chan os.Signal, 1)
  31. signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
  32. for msg := range msgChan {
  33. // 消费者对重复消息进行过滤,使用redis存储消息
  34. ok, err := rdb.Exists(ctx, msg.MessageId).Result()
  35. if err != nil {
  36. log.Println(err)
  37. msg.Ack(false) // 主动应答
  38. continue
  39. }
  40. if ok == 1 {
  41. log.Printf("message:%s does consumed", msg.MessageId)
  42. msg.Ack(false) // 主动应答
  43. continue
  44. }
  45. // To DO ...
  46. log.Println(string(msg.Body))
  47. // 加入到redis
  48. rdb.Set(ctx, msg.MessageId, "", 10*time.Minute)
  49. msg.Ack(false) // 主动应答
  50. }
  51. <-quit
  52. }