“发布/订阅”模式,将分发一个消息给多个消费者(consumers)。
image.png

交换机(Exchanges)

前面的教程中,我们发送消息到队列并从中取出消息。现在是时候介绍RabbitMQ中完整的消息模型了。
让我们简单的概括一下之前的教程:

  • 发布者(producer)是发布消息的应用程序。
  • 队列(queue)用于消息存储的缓冲。
  • 消费者(consumer)是接收消息的应用程序。

RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。
发布者(producer)只需要把消息发送给一个交换机(exchange)。交换机非常简单,它一边从发布者方接收消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的。

交换机类型

有几个可供选择的交换机类型:direct, topic, headers和fanout。

实现

  1. func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {
  2. rmq := NewRabbitMQ("", exchangeName, "")
  3. var err error
  4. // 获取 connection
  5. rmq.conn, err = amqp.Dial(rmq.MqUrl)
  6. rmq.failOnErr(err, "创建连接错误")
  7. rmq.channel, err = rmq.conn.Channel()
  8. rmq.failOnErr(err, "创建连接错误")
  9. return rmq
  10. }
  11. // 订阅生成模式
  12. func (r *RabbitMQ) PublishPub(message string) {
  13. // 1.尝试创建交换机
  14. err := r.channel.ExchangeDeclare(
  15. r.Exchange,
  16. "fanout",
  17. true,
  18. false,
  19. //true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
  20. false,
  21. false,
  22. nil,
  23. )
  24. r.failOnErr(err, "Failed to declare exchange.")
  25. // 2.发送消息
  26. err = r.channel.Publish(
  27. r.Exchange,
  28. "",
  29. false,
  30. false,
  31. amqp.Publishing{
  32. ContentType: "text/plain",
  33. Body: []byte(message),
  34. })
  35. r.failOnErr(err, "Failed to publish message.")
  36. }
  37. func (r *RabbitMQ) ReceiveSub() {
  38. // 1.尝试创建交换机
  39. err := r.channel.ExchangeDeclare(
  40. r.Exchange,
  41. // 交换机类型
  42. "fanout",
  43. true,
  44. false,
  45. false,
  46. false,
  47. nil,
  48. )
  49. r.failOnErr(err, "Failed to declare exchange")
  50. // 2.试探创建队列
  51. q, err := r.channel.QueueDeclare(
  52. "",
  53. false,
  54. false,
  55. true,
  56. false,
  57. nil,
  58. )
  59. r.failOnErr(err, "Failed to decalre queue")
  60. // 绑定队列到 exchange中
  61. err = r.channel.QueueBind(
  62. q.Name,
  63. // 在pub/sub模式下, 这里的key必须为空
  64. "",
  65. r.Exchange,
  66. false,
  67. nil,
  68. )
  69. r.failOnErr(err, "Failed to decalre queue")
  70. // 消费信息
  71. msgs, err := r.channel.Consume(
  72. q.Name,
  73. "",
  74. true,
  75. false,
  76. false,
  77. false,
  78. nil,
  79. )
  80. forever := make(chan bool)
  81. go func() {
  82. for msg := range msgs {
  83. log.Printf("Received a message: %s", msg.Body)
  84. }
  85. }()
  86. <-forever
  87. }

让我们来试下
pub.go

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/WenkeZhou/flash-sale/pkg/rabbitmq"
  5. "strconv"
  6. "time"
  7. )
  8. func main() {
  9. rmq := rabbitmq.NewRabbitMQPubSub("exchange_test")
  10. for i:=1; i<=100; i++ {
  11. rmq.PublishPub("this is pub message: " + strconv.Itoa(i))
  12. fmt.Printf("this is pub message: " + strconv.Itoa(i))
  13. time.Sleep(1 * time.Second)
  14. }
  15. }

sub.go

  1. package main
  2. import (
  3. "github.com/WenkeZhou/flash-sale/pkg/rabbitmq"
  4. )
  5. func main() {
  6. rmq := rabbitmq.NewRabbitMQPubSub("exchange_test")
  7. rmq.ReceiveSub()
  8. }