“发布/订阅”模式,将分发一个消息给多个消费者(consumers)。
交换机(Exchanges)
前面的教程中,我们发送消息到队列并从中取出消息。现在是时候介绍RabbitMQ中完整的消息模型了。
让我们简单的概括一下之前的教程:
- 发布者(producer)是发布消息的应用程序。
- 队列(queue)用于消息存储的缓冲。
- 消费者(consumer)是接收消息的应用程序。
RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。
发布者(producer)只需要把消息发送给一个交换机(exchange)。交换机非常简单,它一边从发布者方接收消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的。
交换机类型
有几个可供选择的交换机类型:direct, topic, headers和fanout。
实现
func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {
rmq := NewRabbitMQ("", exchangeName, "")
var err error
// 获取 connection
rmq.conn, err = amqp.Dial(rmq.MqUrl)
rmq.failOnErr(err, "创建连接错误")
rmq.channel, err = rmq.conn.Channel()
rmq.failOnErr(err, "创建连接错误")
return rmq
}
// 订阅生成模式
func (r *RabbitMQ) PublishPub(message string) {
// 1.尝试创建交换机
err := r.channel.ExchangeDeclare(
r.Exchange,
"fanout",
true,
false,
//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
false,
false,
nil,
)
r.failOnErr(err, "Failed to declare exchange.")
// 2.发送消息
err = r.channel.Publish(
r.Exchange,
"",
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
r.failOnErr(err, "Failed to publish message.")
}
func (r *RabbitMQ) ReceiveSub() {
// 1.尝试创建交换机
err := r.channel.ExchangeDeclare(
r.Exchange,
// 交换机类型
"fanout",
true,
false,
false,
false,
nil,
)
r.failOnErr(err, "Failed to declare exchange")
// 2.试探创建队列
q, err := r.channel.QueueDeclare(
"",
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "Failed to decalre queue")
// 绑定队列到 exchange中
err = r.channel.QueueBind(
q.Name,
// 在pub/sub模式下, 这里的key必须为空
"",
r.Exchange,
false,
nil,
)
r.failOnErr(err, "Failed to decalre queue")
// 消费信息
msgs, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for msg := range msgs {
log.Printf("Received a message: %s", msg.Body)
}
}()
<-forever
}
让我们来试下
pub.go
package main
import (
"fmt"
"github.com/WenkeZhou/flash-sale/pkg/rabbitmq"
"strconv"
"time"
)
func main() {
rmq := rabbitmq.NewRabbitMQPubSub("exchange_test")
for i:=1; i<=100; i++ {
rmq.PublishPub("this is pub message: " + strconv.Itoa(i))
fmt.Printf("this is pub message: " + strconv.Itoa(i))
time.Sleep(1 * time.Second)
}
}
sub.go
package main
import (
"github.com/WenkeZhou/flash-sale/pkg/rabbitmq"
)
func main() {
rmq := rabbitmq.NewRabbitMQPubSub("exchange_test")
rmq.ReceiveSub()
}