simple 模式是RabbitMQ 里最基础的一种模式,其他模式是类似于 simple 模式,参数配置变化,发送和接受者稍作改动。

模型图

simple 模式的模型图很简单。
生产(Producing)的意思就是发送。发送消息的程序就是一个生产者(producer)。

队列(queue)就是存在于RabbitMQ中邮箱的名称。虽然消息的传输经过了RabbitMQ和你的应用程序,但是它只能被存储于队列当中。实质上队列就是个巨大的消息缓冲区,它的大小只受主机内存和硬盘限制。多个生产者(producers)可以把消息发送给同一个队列,同样,多个消费者(consumers)也能够从同一个队列(queue)中获取数据。

消费(Consuming)和接收(receiving)是同一个意思。一个消费者(consumer)就是一个等待获取消息的程序。

image.png
这里直接给出代码, 代码里有对应的注释
新建 rabbitmq.go

  1. package rabbitmq
  2. import (
  3. "fmt"
  4. "github.com/streadway/amqp"
  5. "log"
  6. )
  7. // 连接信息
  8. const MQURL = "amqp://guest:guest@localhost:5672/"
  9. // rabbitMQ 连接体
  10. type RabbitMQ struct {
  11. conn *amqp.Connection
  12. channel *amqp.Channel
  13. QueueName string // 队列名称
  14. Exchange string // 交换机名称
  15. Key string // bind key 名称
  16. MqUrl string // 连接信息
  17. }
  18. // 创建RabbitMQ结构体实例
  19. func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {
  20. return &RabbitMQ{QueueName: queueName, Exchange: exchange, Key: key, MqUrl: MQURL}
  21. }
  22. // 断开 channel 和 connection
  23. func (r *RabbitMQ) Destroy() {
  24. r.channel.Close()
  25. r.conn.Close()
  26. }
  27. // 错误处理函数
  28. func (r *RabbitMQ) failOnErr(err error, message string) {
  29. if err != nil {
  30. log.Fatalf("[err]%s, [message]%s", err, message)
  31. }
  32. }
  33. // 创建简单模式下RabbitMQ实例
  34. func NewRabbitMQSimple(queueName string) *RabbitMQ {
  35. rmq := NewRabbitMQ(queueName, "", "")
  36. var err error
  37. // 获取 connection
  38. rmq.conn, err = amqp.Dial(rmq.MqUrl)
  39. rmq.failOnErr(err, "创建连接错误")
  40. rmq.channel, err = rmq.conn.Channel()
  41. rmq.failOnErr(err, "创建连接错误")
  42. return rmq
  43. }
  44. func (r *RabbitMQ) PublishSimple(message string) {
  45. // 1.申请队列, 如果队列不存在会自动创建, 如果存在则跳过创建
  46. // 保证队列存在, 消息能发送到队列中
  47. _, err := r.channel.QueueDeclare(
  48. // name
  49. r.QueueName,
  50. // durable, 是否持久化
  51. false,
  52. // autoDelete, 是否为自动化删除
  53. false,
  54. // exclusive, 是否具有排他性
  55. false,
  56. // noWait, 是否阻塞
  57. false,
  58. // args,
  59. nil,
  60. )
  61. if err != nil {
  62. fmt.Println(err)
  63. }
  64. // 2.发送消息到队列中
  65. r.channel.Publish(
  66. r.Exchange,
  67. r.QueueName,
  68. // 如果为true, 根据 exchange 类型和 routekey 规则,
  69. // 如果无法找到该队列,那么会把发送的消息返回给发送者。
  70. false,
  71. // 如果为ture时, 当 exchange 发送消息到队列后发现队列上没有绑定消费者,
  72. // 这会把消息返回给消费者。
  73. false,
  74. amqp.Publishing{
  75. ContentType: "text/plain",
  76. Body: []byte(message),
  77. },
  78. )
  79. }
  80. func (r *RabbitMQ) ConsumeSimple() {
  81. // 1.申请队列, 如果队列不存在会自动创建, 如果存在则跳过创建
  82. // 保证队列存在, 消息能发送到队列中
  83. _, err := r.channel.QueueDeclare(
  84. r.QueueName,
  85. // 是否持久化
  86. false,
  87. // 是否为自动化删除
  88. false,
  89. // 是否具有排他性
  90. false,
  91. // 是否阻塞
  92. false,
  93. nil,
  94. )
  95. if err != nil {
  96. fmt.Println(err)
  97. }
  98. msgs, err := r.channel.Consume(
  99. r.QueueName,
  100. // 用来区分多个消费者
  101. "",
  102. // 是否自动应答, 默认为true; 为false 需要手动实现反馈回调
  103. true,
  104. // 是否具有排他性,
  105. false,
  106. // 如果设置为true, 表示不能将同一个conenction中发送的消息传递给这个connection中的消费者
  107. false,
  108. // 队列消费是否阻塞, false 表示阻塞。
  109. false,
  110. nil,
  111. )
  112. if err != nil {
  113. fmt.Println(err)
  114. }
  115. forever := make(chan bool)
  116. // 启动协程处理消息
  117. go func() {
  118. for d := range msgs {
  119. // 实现我们要处理的逻辑函数
  120. log.Printf("Receive a message: %s", d.Body)
  121. }
  122. }()
  123. log.Printf("[*] Waiting for message. To exit to press CTRL+C")
  124. <-forever
  125. }

我来测试下
send.go

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/WenkeZhou/flash-sale/pkg/rabbitmq"
  5. )
  6. func main() {
  7. rmq := rabbitmq.NewRabbitMQSimple("simple")
  8. rmq.PublishSimple("hello this is at test!")
  9. fmt.Println("发送成功")
  10. }

receive.go

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/WenkeZhou/flash-sale/pkg/rabbitmq"
  5. )
  6. func main() {
  7. rmq := rabbitmq.NewRabbitMQSimple("simple")
  8. rmq.ConsumeSimple()
  9. fmt.Println("发送成功")
  10. }