前提条件

本教程假设RabbitMQ已经安装在你本机的 (5672)端口。如果你使用了不同的主机、端口或者凭证,连接设置就需要作出相应的修改。

概述

RabbitMQ是一个消息代理:它的作用就是接受并转发消息。可以把它想象成一个邮局:你把邮件放进邮箱,邮递员会把邮件送到你的收件人那里。在这个比喻中,RabbitMQ是邮箱、邮局和邮递员的角色。
RabbitMQ和邮局的主要区别在于它不处理纸张,而是接收、存储和发送二进制数据——消息。
下面是RabbitMQ和消息所涉及到的一些术语。

  • 生产意味着发送。发送消息的程序是生产者:

HelloWorld - 图1

  • 队列是位于RabbitMQ内部的邮箱的名称。尽管消息通过RabbitMQ和你的应用程序流动,但它们只能存储在队列中。队列只受主机内存和磁盘限制的限制,实际上它是一个大的消息缓冲区。许多生产者可以向一个队列发送消息,而许多消费者可以尝试从一个队列接收数据。以下是我们表示队列的方式:

HelloWorld - 图2

  • 消费与接收具有相似的含义。消费者(consumer)就是一个等待获取消息的程序:

HelloWorld - 图3
请注意,生产者,消费者和代理(broker)不必位于同一主机上。实际上,在大多数应用程序中它们不是。一个应用程序既可以是生产者,也可以是消费者。

Hello World

(使用Go RabbitMQ客户端)
在本教程的这一部分中,我们将在Go中编写两个小程序:发送单个消息的生产者和接收消息并将其打印出来的消费者。我们将忽略Go-RabbitMQ API中的一些细节,只关注非常简单的事情,以便开始教程。这是一个消息传递版的“Hello World”。
在下图中,“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列——RabbitMQ代表消费者保存的消息缓冲区。
HelloWorld - 图4
Go RabbitMQ客户端库
RabbitMQ讲多种协议。本教程使用amqp0-9-1,这是一个开放的、通用的消息传递协议。RabbitMQ有许多不同语言的客户端。在本教程中,我们将使用Go amqp客户端。
首先,使用go get安装amqp

  1. go get github.com/streadway/amqp

现在安装好amqp之后,我们就可以编写一些代码。

发送

HelloWorld - 图5
我们将消息发布者(发送者)称为 send.go,将消息消费者(接收者)称为receive.go。发布者将连接到RabbitMQ,发送一条消息,然后退出。
send.go中,我们需要首先导入库:

  1. package main
  2. import (
  3. "log"
  4. "github.com/streadway/amqp"
  5. )

我们还需要一个公共函数来检查每个amqp调用的返回值:

  1. func failOnError(err error, msg string) {
  2. if err != nil {
  3. log.Fatalf("%s: %s", msg, err)
  4. }
  5. }

然后连接到RabbitMQ服务器

  1. // 1. 尝试连接RabbitMQ,建立连接
  2. // 该连接抽象了套接字连接,并为我们处理协议版本协商和认证等。
  3. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  4. failOnError(err, "Failed to connect to RabbitMQ")
  5. defer conn.Close()

配置连接套接字,它主要定义连接的协议和身份验证等。接下来,我们创建一个通道

  1. // 2. 接下来,我们创建一个通道,大多数API都是用过该通道操作的。
  2. ch, err := conn.Channel()
  3. failOnError(err, "Failed to open a channel")
  4. defer ch.Close()

要发送,我们必须声明要发送到的队列。然后我们可以将消息发布到队列:

  1. // 3. 声明消息要发送到的队列
  2. q, err := ch.QueueDeclare(
  3. "hello", // name
  4. false, // durable
  5. false, // delete when unused
  6. false, // exclusive
  7. false, // no-wait
  8. nil, // arguments
  9. )
  10. failOnError(err, "Failed to declare a queue")
  11. body := "Hello World!"
  12. // 4.将消息发布到声明的队列
  13. err = ch.Publish(
  14. "", // exchange
  15. q.Name, // routing key
  16. false, // mandatory
  17. false, // immediate
  18. amqp.Publishing {
  19. ContentType: "text/plain",
  20. Body: []byte(body),
  21. })
  22. failOnError(err, "Failed to publish a message")

声明队列是幂等的——仅当队列不存在时才创建。消息内容是一个字节数组,因此你可以在此处编码任何内容。
点击查看完整的send.go文件

接收

以上内容是发布者。我们的消费者需要监听来自RabbitMQ的消息,因此与发布者不同,消费者保持持久运行状态以监听消息并打印出来。
HelloWorld - 图6
该代码(在receive.go中)具有与send相同的导入和帮助功能:

  1. package main
  2. import (
  3. "log"
  4. "github.com/streadway/amqp"
  5. )
  6. func failOnError(err error, msg string) {
  7. if err != nil {
  8. log.Fatalf("%s: %s", msg, err)
  9. }
  10. }

设置与生产者相同,首先打开一个连接和一个Channel,并声明我们要消费的队列。请注意,这与发送的队列相一致

  1. // 建立连接
  2. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  3. failOnError(err, "Failed to connect to RabbitMQ")
  4. defer conn.Close()
  5. // 获取channel
  6. ch, err := conn.Channel()
  7. failOnError(err, "Failed to open a channel")
  8. defer ch.Close()
  9. // 声明队列
  10. q, err := ch.QueueDeclare(
  11. "hello", // name
  12. false, // durable
  13. false, // delete when unused
  14. false, // exclusive
  15. false, // no-wait
  16. nil, // arguments
  17. )
  18. failOnError(err, "Failed to declare a queue")

请注意,我们也在这里声明队列。因为我们可能在发布者之前启动使用者,所以我们希望在尝试使用队列中的消息之前确保队列存在。
我们将告诉服务器将队列中的消息传递给我们。由于它将异步地向我们发送消息,因此我们将在goroutine中从通道(由amqp::Consume返回)中读取消息。

  1. // 获取接收消息的Delivery通道
  2. msgs, err := ch.Consume(
  3. q.Name, // queue
  4. "", // consumer
  5. true, // auto-ack
  6. false, // exclusive
  7. false, // no-local
  8. false, // no-wait
  9. nil, // args
  10. )
  11. failOnError(err, "Failed to register a consumer")
  12. forever := make(chan bool)
  13. go func() {
  14. for d := range msgs {
  15. log.Printf("Received a message: %s", d.Body)
  16. }
  17. }()
  18. log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
  19. <-forever

点击完整的receive.go脚本

完整示例

现在我们可以运行两个脚本。在一个终端窗口,运行发布者:

  1. go run send.go

然后,运行使用者:

  1. go run receive.go

消费者将打印通过RabbitMQ从发布者那里得到的消息。使用者将持续运行,等待消息(使用Ctrl-C停止它),可以尝试从另一个终端运行发布者。
如果要检查队列,请尝试使用rabbitmqctl list_queues命令。接下来该继续教程的第二部分并建立一个简单的任务队列。