simple 模式是RabbitMQ 里最基础的一种模式,其他模式是类似于 simple 模式,参数配置变化,发送和接受者稍作改动。
模型图
simple 模式的模型图很简单。
生产(Producing)的意思就是发送。发送消息的程序就是一个生产者(producer)。
队列(queue)就是存在于RabbitMQ中邮箱的名称。虽然消息的传输经过了RabbitMQ和你的应用程序,但是它只能被存储于队列当中。实质上队列就是个巨大的消息缓冲区,它的大小只受主机内存和硬盘限制。多个生产者(producers)可以把消息发送给同一个队列,同样,多个消费者(consumers)也能够从同一个队列(queue)中获取数据。
消费(Consuming)和接收(receiving)是同一个意思。一个消费者(consumer)就是一个等待获取消息的程序。

这里直接给出代码, 代码里有对应的注释
新建 rabbitmq.go
package rabbitmqimport ("fmt""github.com/streadway/amqp""log")// 连接信息const MQURL = "amqp://guest:guest@localhost:5672/"// rabbitMQ 连接体type RabbitMQ struct {conn *amqp.Connectionchannel *amqp.ChannelQueueName string // 队列名称Exchange string // 交换机名称Key string // bind key 名称MqUrl string // 连接信息}// 创建RabbitMQ结构体实例func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {return &RabbitMQ{QueueName: queueName, Exchange: exchange, Key: key, MqUrl: MQURL}}// 断开 channel 和 connectionfunc (r *RabbitMQ) Destroy() {r.channel.Close()r.conn.Close()}// 错误处理函数func (r *RabbitMQ) failOnErr(err error, message string) {if err != nil {log.Fatalf("[err]%s, [message]%s", err, message)}}// 创建简单模式下RabbitMQ实例func NewRabbitMQSimple(queueName string) *RabbitMQ {rmq := NewRabbitMQ(queueName, "", "")var err error// 获取 connectionrmq.conn, err = amqp.Dial(rmq.MqUrl)rmq.failOnErr(err, "创建连接错误")rmq.channel, err = rmq.conn.Channel()rmq.failOnErr(err, "创建连接错误")return rmq}func (r *RabbitMQ) PublishSimple(message string) {// 1.申请队列, 如果队列不存在会自动创建, 如果存在则跳过创建// 保证队列存在, 消息能发送到队列中_, err := r.channel.QueueDeclare(// namer.QueueName,// durable, 是否持久化false,// autoDelete, 是否为自动化删除false,// exclusive, 是否具有排他性false,// noWait, 是否阻塞false,// args,nil,)if err != nil {fmt.Println(err)}// 2.发送消息到队列中r.channel.Publish(r.Exchange,r.QueueName,// 如果为true, 根据 exchange 类型和 routekey 规则,// 如果无法找到该队列,那么会把发送的消息返回给发送者。false,// 如果为ture时, 当 exchange 发送消息到队列后发现队列上没有绑定消费者,// 这会把消息返回给消费者。false,amqp.Publishing{ContentType: "text/plain",Body: []byte(message),},)}func (r *RabbitMQ) ConsumeSimple() {// 1.申请队列, 如果队列不存在会自动创建, 如果存在则跳过创建// 保证队列存在, 消息能发送到队列中_, err := r.channel.QueueDeclare(r.QueueName,// 是否持久化false,// 是否为自动化删除false,// 是否具有排他性false,// 是否阻塞false,nil,)if err != nil {fmt.Println(err)}msgs, err := r.channel.Consume(r.QueueName,// 用来区分多个消费者"",// 是否自动应答, 默认为true; 为false 需要手动实现反馈回调true,// 是否具有排他性,false,// 如果设置为true, 表示不能将同一个conenction中发送的消息传递给这个connection中的消费者false,// 队列消费是否阻塞, false 表示阻塞。false,nil,)if err != nil {fmt.Println(err)}forever := make(chan bool)// 启动协程处理消息go func() {for d := range msgs {// 实现我们要处理的逻辑函数log.Printf("Receive a message: %s", d.Body)}}()log.Printf("[*] Waiting for message. To exit to press CTRL+C")<-forever}
我来测试下
send.go
package mainimport ("fmt""github.com/WenkeZhou/flash-sale/pkg/rabbitmq")func main() {rmq := rabbitmq.NewRabbitMQSimple("simple")rmq.PublishSimple("hello this is at test!")fmt.Println("发送成功")}
receive.go
package mainimport ("fmt""github.com/WenkeZhou/flash-sale/pkg/rabbitmq")func main() {rmq := rabbitmq.NewRabbitMQSimple("simple")rmq.ConsumeSimple()fmt.Println("发送成功")}
