“发布/订阅”模式,将分发一个消息给多个消费者(consumers)。
交换机(Exchanges)
前面的教程中,我们发送消息到队列并从中取出消息。现在是时候介绍RabbitMQ中完整的消息模型了。
让我们简单的概括一下之前的教程:
- 发布者(producer)是发布消息的应用程序。
- 队列(queue)用于消息存储的缓冲。
- 消费者(consumer)是接收消息的应用程序。
RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。
发布者(producer)只需要把消息发送给一个交换机(exchange)。交换机非常简单,它一边从发布者方接收消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的。
交换机类型
有几个可供选择的交换机类型:direct, topic, headers和fanout。
实现
func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {rmq := NewRabbitMQ("", exchangeName, "")var err error// 获取 connectionrmq.conn, err = amqp.Dial(rmq.MqUrl)rmq.failOnErr(err, "创建连接错误")rmq.channel, err = rmq.conn.Channel()rmq.failOnErr(err, "创建连接错误")return rmq}// 订阅生成模式func (r *RabbitMQ) PublishPub(message string) {// 1.尝试创建交换机err := r.channel.ExchangeDeclare(r.Exchange,"fanout",true,false,//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定false,false,nil,)r.failOnErr(err, "Failed to declare exchange.")// 2.发送消息err = r.channel.Publish(r.Exchange,"",false,false,amqp.Publishing{ContentType: "text/plain",Body: []byte(message),})r.failOnErr(err, "Failed to publish message.")}func (r *RabbitMQ) ReceiveSub() {// 1.尝试创建交换机err := r.channel.ExchangeDeclare(r.Exchange,// 交换机类型"fanout",true,false,false,false,nil,)r.failOnErr(err, "Failed to declare exchange")// 2.试探创建队列q, err := r.channel.QueueDeclare("",false,false,true,false,nil,)r.failOnErr(err, "Failed to decalre queue")// 绑定队列到 exchange中err = r.channel.QueueBind(q.Name,// 在pub/sub模式下, 这里的key必须为空"",r.Exchange,false,nil,)r.failOnErr(err, "Failed to decalre queue")// 消费信息msgs, err := r.channel.Consume(q.Name,"",true,false,false,false,nil,)forever := make(chan bool)go func() {for msg := range msgs {log.Printf("Received a message: %s", msg.Body)}}()<-forever}
让我们来试下
pub.go
package mainimport ("fmt""github.com/WenkeZhou/flash-sale/pkg/rabbitmq""strconv""time")func main() {rmq := rabbitmq.NewRabbitMQPubSub("exchange_test")for i:=1; i<=100; i++ {rmq.PublishPub("this is pub message: " + strconv.Itoa(i))fmt.Printf("this is pub message: " + strconv.Itoa(i))time.Sleep(1 * time.Second)}}
sub.go
package mainimport ("github.com/WenkeZhou/flash-sale/pkg/rabbitmq")func main() {rmq := rabbitmq.NewRabbitMQPubSub("exchange_test")rmq.ReceiveSub()}
