踩坑:mqurl=amqp://username:password@host:port/vhost
,创建的vhost名称是什么就写什么,不管vhost自身有没有带/
1. amqp
go get github.com/streadway/amqp
2. 自定义结构体
/**
* @Author: cyj19
* @Date: 2022/5/17 10:49
*/
package rabbitmq
import (
"context"
"github.com/streadway/amqp"
)
type RabbitMQ struct {
Ctx context.Context
Conn *amqp.Connection // 连接
Channel *amqp.Channel // 管道
QueueName string // 队列名称
Exchange string // 交换机
RoutingKey string // 路由key
MQUrl string // mq连接
}
func NewRabbitMQ(ctx context.Context, url, queueName, exchange, key string) (*RabbitMQ, error) {
var err error
r := &RabbitMQ{
Ctx: ctx,
QueueName: queueName,
Exchange: exchange,
RoutingKey: key,
MQUrl: url,
}
r.Conn, err = amqp.Dial(url)
if err != nil {
return nil, err
}
r.Channel, err = r.Conn.Channel()
if err != nil {
return nil, err
}
return r, nil
}
// Close 释放资源
func (mq *RabbitMQ) Close() {
mq.Conn.Close()
mq.Channel.Close()
}
func (mq RabbitMQ) InitQueue() error {
// 声明队列
_, err := mq.Channel.QueueDeclare(
mq.QueueName,
true, // 是否持久化
false, // 是否自动删除(前提是至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。注意:生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列)
false, // 是否为排他队列(排他的队列仅对“首次”声明的conn可见[一个conn中的其他channel也能访问该队列],conn结束后队列删除)
false, // 是否等待服务器确认,为 true 时,无需等待服务器的确认
nil, //额外属性
)
return err
}
func (mq RabbitMQ) InitExchange() error {
// 声明交换机
err := mq.Channel.ExchangeDeclare(
mq.Exchange,
"topic", //exchange type:一般用fanout、direct、topic
true, // 是否持久化
false, //是否自动删除(自动删除的前提是至少有一个队列或者交换器与这和交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑)
false, //设置是否内置的。true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式
false, // 是否等待服务器确认,为 true 时,无需等待服务器的确认
nil, // 额外属性
)
return err
}
func (mq *RabbitMQ) InitBind() error {
// 建立Binding(可建立多个绑定关系)
err := mq.Channel.QueueBind(
mq.QueueName, // 绑定的队列名称
mq.RoutingKey, // 用于消息路由分发的key
mq.Exchange, // 绑定的exchange名
false, // 是否等待服务器确认,为 true 时,无需等待服务器的确认
nil, // 额外属性
)
return err
}
func (mq RabbitMQ) Public(msg amqp.Publishing) error {
err := mq.Channel.Publish(mq.Exchange, mq.RoutingKey, false, false, msg)
return err
}
/**
* @Author: cyj19
* @Date: 2022/5/18 17:11
*/
package rabbitmq
import (
"github.com/streadway/amqp"
"time"
)
type PublicMessage struct {
StartTime time.Time
Msg amqp.Publishing
}
3. 生产者
/**
* @Author: cyj19
* @Date: 2022/5/17 11:35
*/
package main
import (
"context"
"day20220517-02/pkg/rabbitmq"
"github.com/rs/xid"
"github.com/streadway/amqp"
"log"
"time"
)
func main() {
// 声明生成者
url := "amqp://admin:admin@localhost:5672//go-mq"
mq, err := rabbitmq.NewRabbitMQ(context.Background(), url, "go-queue", "go-exchange", "go-key")
if err != nil {
log.Fatalln(err)
}
defer mq.Close()
err = mq.InitQueue()
if err != nil {
log.Fatalln(err)
}
err = mq.InitExchange()
if err != nil {
log.Fatalln(err)
}
err = mq.InitBind()
if err != nil {
log.Fatalln(err)
}
err = mq.Channel.Confirm(false)
if err != nil {
log.Fatalln(err)
}
confirm := mq.Channel.NotifyPublish(make(chan amqp.Confirmation))
ticker := time.NewTicker(20 * time.Second)
defer func() {
ticker.Stop()
}()
deliveryMap := make(map[uint64]*rabbitmq.PublicMessage)
var deliveryTag uint64 = 1
var pMsg *rabbitmq.PublicMessage
go func() {
for i := 0; i < 10; i++ {
pMsg = &rabbitmq.PublicMessage{
StartTime: time.Now(),
Msg: amqp.Publishing{ // 发送的消息,固定有消息体和一些额外的消息头,包中提供了封装对象
MessageId: xid.New().String(), // 消息ID
ContentType: "text/plain", // 消息内容的类型
Body: []byte("hello cyj19"), // 消息内容
},
}
// 4.发送消息
errv := mq.Public(pMsg.Msg)
if errv != nil {
return
}
deliveryMap[deliveryTag] = pMsg
deliveryTag++
}
}()
for {
select {
case <-mq.Ctx.Done():
return
case c, ok := <-confirm:
if !ok {
log.Println("[RABBITMQ_CLIENT]", "client Publish notify channel error")
return
}
log.Println("RabbitMQ ack")
pMsg = deliveryMap[c.DeliveryTag]
// fmt.Println("DeliveryTag:", c.DeliveryTag)
delete(deliveryMap, c.DeliveryTag)
case <-ticker.C:
now := time.Now()
// 遍历消息表,
for key := range deliveryMap {
pMsg = deliveryMap[key]
if pMsg != nil {
// 消息超时还没收到ack,重发一次消息
if now.Sub(pMsg.StartTime.Add(10*time.Second)) > 0 {
delete(deliveryMap, key)
mq.Public(pMsg.Msg)
}
}
}
}
}
}
4. 消费者
package main
func main(){
// 1. 初始化MQ
ctx := context.Background()
url := "amqp://username:password@127.0.0.1:5672/hello-mq"
mq, err := rabbit.NewRabbitMQ(ctx, url, "hello-queue", "hello-exchange", "hello-key")
if err != nil {
log.Fatalln(err)
}
defer mq.Close()
err = mq.InitQueue()
if err != nil {
log.Fatalln(err)
}
// 3.从队列获取消息(消费者只关注队列)consume方式会不断的从队列中获取消息
msgChan, err := mq.Channel.Consume(
mq.QueueName, // 队列名
"", // 消费者名,用来区分多个消费者,以实现公平分发或均等分发策略
false, // 是否自动应答
false, // 是否排他
false, // 是否接收只同一个连接中的消息,若为true,则只能接收一个conn中发送的消息
false, // 队列消费是否阻塞
nil, // 额外属性
)
if err != nil {
log.Println("获取消息失败", err)
return
}
// 优雅退出
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
for msg := range msgChan {
// 消费者对重复消息进行过滤,使用redis存储消息
ok, err := rdb.Exists(ctx, msg.MessageId).Result()
if err != nil {
log.Println(err)
msg.Ack(false) // 主动应答
continue
}
if ok == 1 {
log.Printf("message:%s does consumed", msg.MessageId)
msg.Ack(false) // 主动应答
continue
}
// To DO ...
log.Println(string(msg.Body))
// 加入到redis
rdb.Set(ctx, msg.MessageId, "", 10*time.Minute)
msg.Ack(false) // 主动应答
}
<-quit
}