踩坑:mqurl=amqp://username:password@host:port/vhost,创建的vhost名称是什么就写什么,不管vhost自身有没有带/
1. amqp
go get github.com/streadway/amqp
2. 自定义结构体
/*** @Author: cyj19* @Date: 2022/5/17 10:49*/package rabbitmqimport ("context""github.com/streadway/amqp")type RabbitMQ struct {Ctx context.ContextConn *amqp.Connection // 连接Channel *amqp.Channel // 管道QueueName string // 队列名称Exchange string // 交换机RoutingKey string // 路由keyMQUrl string // mq连接}func NewRabbitMQ(ctx context.Context, url, queueName, exchange, key string) (*RabbitMQ, error) {var err errorr := &RabbitMQ{Ctx: ctx,QueueName: queueName,Exchange: exchange,RoutingKey: key,MQUrl: url,}r.Conn, err = amqp.Dial(url)if err != nil {return nil, err}r.Channel, err = r.Conn.Channel()if err != nil {return nil, err}return r, nil}// Close 释放资源func (mq *RabbitMQ) Close() {mq.Conn.Close()mq.Channel.Close()}func (mq RabbitMQ) InitQueue() error {// 声明队列_, err := mq.Channel.QueueDeclare(mq.QueueName,true, // 是否持久化false, // 是否自动删除(前提是至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。注意:生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列)false, // 是否为排他队列(排他的队列仅对“首次”声明的conn可见[一个conn中的其他channel也能访问该队列],conn结束后队列删除)false, // 是否等待服务器确认,为 true 时,无需等待服务器的确认nil, //额外属性)return err}func (mq RabbitMQ) InitExchange() error {// 声明交换机err := mq.Channel.ExchangeDeclare(mq.Exchange,"topic", //exchange type:一般用fanout、direct、topictrue, // 是否持久化false, //是否自动删除(自动删除的前提是至少有一个队列或者交换器与这和交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑)false, //设置是否内置的。true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式false, // 是否等待服务器确认,为 true 时,无需等待服务器的确认nil, // 额外属性)return err}func (mq *RabbitMQ) InitBind() error {// 建立Binding(可建立多个绑定关系)err := mq.Channel.QueueBind(mq.QueueName, // 绑定的队列名称mq.RoutingKey, // 用于消息路由分发的keymq.Exchange, // 绑定的exchange名false, // 是否等待服务器确认,为 true 时,无需等待服务器的确认nil, // 额外属性)return err}func (mq RabbitMQ) Public(msg amqp.Publishing) error {err := mq.Channel.Publish(mq.Exchange, mq.RoutingKey, false, false, msg)return err}
/*** @Author: cyj19* @Date: 2022/5/18 17:11*/package rabbitmqimport ("github.com/streadway/amqp""time")type PublicMessage struct {StartTime time.TimeMsg amqp.Publishing}
3. 生产者
/*** @Author: cyj19* @Date: 2022/5/17 11:35*/package mainimport ("context""day20220517-02/pkg/rabbitmq""github.com/rs/xid""github.com/streadway/amqp""log""time")func main() {// 声明生成者url := "amqp://admin:admin@localhost:5672//go-mq"mq, err := rabbitmq.NewRabbitMQ(context.Background(), url, "go-queue", "go-exchange", "go-key")if err != nil {log.Fatalln(err)}defer mq.Close()err = mq.InitQueue()if err != nil {log.Fatalln(err)}err = mq.InitExchange()if err != nil {log.Fatalln(err)}err = mq.InitBind()if err != nil {log.Fatalln(err)}err = mq.Channel.Confirm(false)if err != nil {log.Fatalln(err)}confirm := mq.Channel.NotifyPublish(make(chan amqp.Confirmation))ticker := time.NewTicker(20 * time.Second)defer func() {ticker.Stop()}()deliveryMap := make(map[uint64]*rabbitmq.PublicMessage)var deliveryTag uint64 = 1var pMsg *rabbitmq.PublicMessagego func() {for i := 0; i < 10; i++ {pMsg = &rabbitmq.PublicMessage{StartTime: time.Now(),Msg: amqp.Publishing{ // 发送的消息,固定有消息体和一些额外的消息头,包中提供了封装对象MessageId: xid.New().String(), // 消息IDContentType: "text/plain", // 消息内容的类型Body: []byte("hello cyj19"), // 消息内容},}// 4.发送消息errv := mq.Public(pMsg.Msg)if errv != nil {return}deliveryMap[deliveryTag] = pMsgdeliveryTag++}}()for {select {case <-mq.Ctx.Done():returncase c, ok := <-confirm:if !ok {log.Println("[RABBITMQ_CLIENT]", "client Publish notify channel error")return}log.Println("RabbitMQ ack")pMsg = deliveryMap[c.DeliveryTag]// fmt.Println("DeliveryTag:", c.DeliveryTag)delete(deliveryMap, c.DeliveryTag)case <-ticker.C:now := time.Now()// 遍历消息表,for key := range deliveryMap {pMsg = deliveryMap[key]if pMsg != nil {// 消息超时还没收到ack,重发一次消息if now.Sub(pMsg.StartTime.Add(10*time.Second)) > 0 {delete(deliveryMap, key)mq.Public(pMsg.Msg)}}}}}}
4. 消费者
package mainfunc main(){// 1. 初始化MQctx := context.Background()url := "amqp://username:password@127.0.0.1:5672/hello-mq"mq, err := rabbit.NewRabbitMQ(ctx, url, "hello-queue", "hello-exchange", "hello-key")if err != nil {log.Fatalln(err)}defer mq.Close()err = mq.InitQueue()if err != nil {log.Fatalln(err)}// 3.从队列获取消息(消费者只关注队列)consume方式会不断的从队列中获取消息msgChan, err := mq.Channel.Consume(mq.QueueName, // 队列名"", // 消费者名,用来区分多个消费者,以实现公平分发或均等分发策略false, // 是否自动应答false, // 是否排他false, // 是否接收只同一个连接中的消息,若为true,则只能接收一个conn中发送的消息false, // 队列消费是否阻塞nil, // 额外属性)if err != nil {log.Println("获取消息失败", err)return}// 优雅退出quit := make(chan os.Signal, 1)signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)for msg := range msgChan {// 消费者对重复消息进行过滤,使用redis存储消息ok, err := rdb.Exists(ctx, msg.MessageId).Result()if err != nil {log.Println(err)msg.Ack(false) // 主动应答continue}if ok == 1 {log.Printf("message:%s does consumed", msg.MessageId)msg.Ack(false) // 主动应答continue}// To DO ...log.Println(string(msg.Body))// 加入到redisrdb.Set(ctx, msg.MessageId, "", 10*time.Minute)msg.Ack(false) // 主动应答}<-quit}
