simple 模式是RabbitMQ 里最基础的一种模式,其他模式是类似于 simple 模式,参数配置变化,发送和接受者稍作改动。
模型图
simple 模式的模型图很简单。
生产(Producing)的意思就是发送。发送消息的程序就是一个生产者(producer)。
队列(queue)就是存在于RabbitMQ中邮箱的名称。虽然消息的传输经过了RabbitMQ和你的应用程序,但是它只能被存储于队列当中。实质上队列就是个巨大的消息缓冲区,它的大小只受主机内存和硬盘限制。多个生产者(producers)可以把消息发送给同一个队列,同样,多个消费者(consumers)也能够从同一个队列(queue)中获取数据。
消费(Consuming)和接收(receiving)是同一个意思。一个消费者(consumer)就是一个等待获取消息的程序。
这里直接给出代码, 代码里有对应的注释
新建 rabbitmq.go
package rabbitmq
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
// 连接信息
const MQURL = "amqp://guest:guest@localhost:5672/"
// rabbitMQ 连接体
type RabbitMQ struct {
conn *amqp.Connection
channel *amqp.Channel
QueueName string // 队列名称
Exchange string // 交换机名称
Key string // bind key 名称
MqUrl string // 连接信息
}
// 创建RabbitMQ结构体实例
func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {
return &RabbitMQ{QueueName: queueName, Exchange: exchange, Key: key, MqUrl: MQURL}
}
// 断开 channel 和 connection
func (r *RabbitMQ) Destroy() {
r.channel.Close()
r.conn.Close()
}
// 错误处理函数
func (r *RabbitMQ) failOnErr(err error, message string) {
if err != nil {
log.Fatalf("[err]%s, [message]%s", err, message)
}
}
// 创建简单模式下RabbitMQ实例
func NewRabbitMQSimple(queueName string) *RabbitMQ {
rmq := NewRabbitMQ(queueName, "", "")
var err error
// 获取 connection
rmq.conn, err = amqp.Dial(rmq.MqUrl)
rmq.failOnErr(err, "创建连接错误")
rmq.channel, err = rmq.conn.Channel()
rmq.failOnErr(err, "创建连接错误")
return rmq
}
func (r *RabbitMQ) PublishSimple(message string) {
// 1.申请队列, 如果队列不存在会自动创建, 如果存在则跳过创建
// 保证队列存在, 消息能发送到队列中
_, err := r.channel.QueueDeclare(
// name
r.QueueName,
// durable, 是否持久化
false,
// autoDelete, 是否为自动化删除
false,
// exclusive, 是否具有排他性
false,
// noWait, 是否阻塞
false,
// args,
nil,
)
if err != nil {
fmt.Println(err)
}
// 2.发送消息到队列中
r.channel.Publish(
r.Exchange,
r.QueueName,
// 如果为true, 根据 exchange 类型和 routekey 规则,
// 如果无法找到该队列,那么会把发送的消息返回给发送者。
false,
// 如果为ture时, 当 exchange 发送消息到队列后发现队列上没有绑定消费者,
// 这会把消息返回给消费者。
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
}
func (r *RabbitMQ) ConsumeSimple() {
// 1.申请队列, 如果队列不存在会自动创建, 如果存在则跳过创建
// 保证队列存在, 消息能发送到队列中
_, err := r.channel.QueueDeclare(
r.QueueName,
// 是否持久化
false,
// 是否为自动化删除
false,
// 是否具有排他性
false,
// 是否阻塞
false,
nil,
)
if err != nil {
fmt.Println(err)
}
msgs, err := r.channel.Consume(
r.QueueName,
// 用来区分多个消费者
"",
// 是否自动应答, 默认为true; 为false 需要手动实现反馈回调
true,
// 是否具有排他性,
false,
// 如果设置为true, 表示不能将同一个conenction中发送的消息传递给这个connection中的消费者
false,
// 队列消费是否阻塞, false 表示阻塞。
false,
nil,
)
if err != nil {
fmt.Println(err)
}
forever := make(chan bool)
// 启动协程处理消息
go func() {
for d := range msgs {
// 实现我们要处理的逻辑函数
log.Printf("Receive a message: %s", d.Body)
}
}()
log.Printf("[*] Waiting for message. To exit to press CTRL+C")
<-forever
}
我来测试下
send.go
package main
import (
"fmt"
"github.com/WenkeZhou/flash-sale/pkg/rabbitmq"
)
func main() {
rmq := rabbitmq.NewRabbitMQSimple("simple")
rmq.PublishSimple("hello this is at test!")
fmt.Println("发送成功")
}
receive.go
package main
import (
"fmt"
"github.com/WenkeZhou/flash-sale/pkg/rabbitmq"
)
func main() {
rmq := rabbitmq.NewRabbitMQSimple("simple")
rmq.ConsumeSimple()
fmt.Println("发送成功")
}