Exchanges(交换器)

在本教程的前面部分中,我们向队列发送消息和从队列接收消息。现在是时候在Rabbit中引入完整的消息传递模型了。
让我们快速回顾一下先前教程中介绍的内容:

  • 生产者是发送消息的用户应用程序。
  • 队列是存储消息的缓冲区。
  • 消费者是接收消息的用户应用程序。

RabbitMQ消息传递模型中的核心思想是生产者从不将任何消息直接发送到队列。实际上,生产者经常甚至根本不知道是否将消息传递到任何队列。
相反,生产者只能将消息发送到交换器。交换器是非常简单的东西。一方面,它接收来自生产者的消息,另一方面,将它们推入队列。交换器必须确切知道如何处理接收到的消息。它应该被附加到特定的队列吗?还是应该将其附加到许多队列中?或者它应该被丢弃。这些规则由交换器的类型定义。
发布/订阅 - 图1

有几种交换器类型可用:direct, topic, headersfanout。我们将集中讨论最后一个——fanout。让我们创建一个这种类型的交换器,并给它起个名字叫logs

  1. err = ch.ExchangeDeclare(
  2. "logs", // name
  3. "fanout", // type
  4. true, // durable
  5. false, // auto-deleted
  6. false, // internal
  7. false, // no-wait
  8. nil, // arguments
  9. )

fanout(扇出)交换器非常简单。正如你可能从名称中猜测的那样,它只是将接收到的所有消息广播到它知道的所有队列中。而这正是我们记录器所需要的。

交换器列表

要列出服务器上的交换器,你可以执行有用的rabbitmqctl命令:

  1. sudo rabbitmqctl list_exchanges

在此列表中,将有一些amq.*交换器和一个默认的(未命名)交换器。这些是默认创建的,但是你现在不太可能需要使用它们。

默认交换器

在本教程的前面部分中,我们还不知道交换器的存在,但仍然能够将消息发送到队列。之所以能这样做,是因为我们使用的是默认交换器,该交换器由空字符串("")标识。
回想一下我们之前是怎么发布消息的:

  1. err = ch.Publish(
  2. "", // exchange
  3. q.Name, // routing key
  4. false, // mandatory
  5. false, // immediate
  6. amqp.Publishing{
  7. ContentType: "text/plain",
  8. Body: []byte(body),
  9. })

在这里,我们使用默认或无名称的交换器:消息将以route_key参数指定的名称路由到队列(如果存在)。
现在,我们可以改为发布到我们的命名交换器:

  1. err = ch.ExchangeDeclare(
  2. "logs", // 使用命名的交换器
  3. "fanout", // 交换器类型
  4. true, // durable
  5. false, // auto-deleted
  6. false, // internal
  7. false, // no-wait
  8. nil, // arguments
  9. )
  10. failOnError(err, "Failed to declare an exchange")
  11. body := bodyFrom(os.Args)
  12. err = ch.Publish(
  13. "logs", // exchange
  14. "", // routing key
  15. false, // mandatory
  16. false, // immediate
  17. amqp.Publishing{
  18. ContentType: "text/plain",
  19. Body: []byte(body),
  20. })

临时队列

你可能还记得,先前我们使用的是具有特定名称的队列(还记得hellotask_queue吗?)能够命名队列对我们来说至关重要——我们需要将工作人员指向同一个队列。当你想在生产者和消费者之间共享队列时,给队列一个名称非常重要。
但对于我们的记录器来说,情况并非如此。我们希望收到所有日志消息,而不仅仅是它们的一部分。我们也只对当前正在发送的消息感兴趣,而对旧消息不感兴趣。为了解决这个问题,我们需要两件事。
首先,当我们连接到Rabbit时,我们需要一个新的、空的队列。为此,我们可以创建一个随机名称的队列,或者更好的方法是让服务器为我们选择一个随机队列名称。
其次,一旦我们断开消费者的连接,队列就会自动删除。
amqp客户端中,当我们传递一个空字符串作为队列名称时,我们将使用随机生成的名称创建一个非持久队列:

  1. q, err := ch.QueueDeclare(
  2. "", // 空字符串作为队列名称
  3. false, // 非持久队列
  4. false, // delete when unused
  5. true, // 独占队列(当前声明队列的连接关闭后即被删除)
  6. false, // no-wait
  7. nil, // arguments
  8. )

上述方法返回时,生成的队列实例包含RabbitMQ生成的随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg
当声明它的连接关闭时,该队列将被删除,因为它被声明为独占。
你可以在队列指南中了解有关exclusive标志和其他队列属性的更多信息。

绑定

发布/订阅 - 图2
我们已经创建了一个扇出交换器和一个队列。现在我们需要告诉交换器将消息发送到我们的队列。交换器和队列之间的关系称为绑定

  1. err = ch.QueueBind(
  2. q.Name, // queue name
  3. "", // routing key
  4. "logs", // exchange
  5. false,
  6. nil,
  7. )

从现在开始,logs交换器将会把消息添加到我们的队列中。

列出绑定关系
你猜也猜到了,我们可以使用下面的命令列出绑定关系

  1. > rabbitmqctl list_bindings
  2. >

  1. 完整示例<br />![img](/images/Go/rabbitmq/tutorials03/python-three-overall.png)<br />产生日志消息的生产程序与上一教程看起来没有太大不同。最重要的变化是我们现在希望将消息发布到`logs`交换器,而不是空的消息交换器。发送时,我们需要提供一个`routingKey`,但是对于`fanout`型交换器,它的值可以被忽略(传空字符串)。下面是`emit_log.go`脚本的代码:
  2. ```go
  3. package main
  4. import (
  5. "log"
  6. "os"
  7. "strings"
  8. "github.com/streadway/amqp"
  9. )
  10. func failOnError(err error, msg string) {
  11. if err != nil {
  12. log.Fatalf("%s: %s", msg, err)
  13. }
  14. }
  15. func main() {
  16. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  17. failOnError(err, "Failed to connect to RabbitMQ")
  18. defer conn.Close()
  19. ch, err := conn.Channel()
  20. failOnError(err, "Failed to open a channel")
  21. defer ch.Close()
  22. err = ch.ExchangeDeclare(
  23. "logs", // name
  24. "fanout", // type
  25. true, // durable
  26. false, // auto-deleted
  27. false, // internal
  28. false, // no-wait
  29. nil, // arguments
  30. )
  31. failOnError(err, "Failed to declare an exchange")
  32. body := bodyFrom(os.Args)
  33. err = ch.Publish(
  34. "logs", // exchange
  35. "", // routing key
  36. false, // mandatory
  37. false, // immediate
  38. amqp.Publishing{
  39. ContentType: "text/plain",
  40. Body: []byte(body),
  41. })
  42. failOnError(err, "Failed to publish a message")
  43. log.Printf(" [x] Sent %s", body)
  44. }
  45. func bodyFrom(args []string) string {
  46. var s string
  47. if (len(args) < 2) || os.Args[1] == "" {
  48. s = "hello"
  49. } else {
  50. s = strings.Join(args[1:], " ")
  51. }
  52. return s
  53. }

如你所见,在建立连接之后,我们声明了交换器。此步骤是必需的,因为禁止发布到不存在的交换器。
如果没有队列绑定到交换器,那么消息将丢失,但这对我们来说是ok的。如果没有消费者在接收,我们可以安全地丢弃该消息。
receive_logs.go的代码:

  1. package main
  2. import (
  3. "log"
  4. "github.com/streadway/amqp"
  5. )
  6. func failOnError(err error, msg string) {
  7. if err != nil {
  8. log.Fatalf("%s: %s", msg, err)
  9. }
  10. }
  11. func main() {
  12. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  13. failOnError(err, "Failed to connect to RabbitMQ")
  14. defer conn.Close()
  15. ch, err := conn.Channel()
  16. failOnError(err, "Failed to open a channel")
  17. defer ch.Close()
  18. err = ch.ExchangeDeclare(
  19. "logs", // name
  20. "fanout", // type
  21. true, // durable
  22. false, // auto-deleted
  23. false, // internal
  24. false, // no-wait
  25. nil, // arguments
  26. )
  27. failOnError(err, "Failed to declare an exchange")
  28. q, err := ch.QueueDeclare(
  29. "", // name
  30. false, // durable
  31. false, // delete when unused
  32. true, // exclusive
  33. false, // no-wait
  34. nil, // arguments
  35. )
  36. failOnError(err, "Failed to declare a queue")
  37. err = ch.QueueBind(
  38. q.Name, // queue name
  39. "", // routing key
  40. "logs", // exchange
  41. false,
  42. nil,
  43. )
  44. failOnError(err, "Failed to bind a queue")
  45. msgs, err := ch.Consume(
  46. q.Name, // queue
  47. "", // consumer
  48. true, // auto-ack
  49. false, // exclusive
  50. false, // no-local
  51. false, // no-wait
  52. nil, // args
  53. )
  54. failOnError(err, "Failed to register a consumer")
  55. forever := make(chan bool)
  56. go func() {
  57. for d := range msgs {
  58. log.Printf(" [x] %s", d.Body)
  59. }
  60. }()
  61. log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
  62. <-forever
  63. }

receive_logs.go源码
如果要将日志保存到文件,只需打开控制台并输入:

  1. go run receive_logs.go > logs_from_rabbit.log

如果希望在屏幕上查看日志,请切换到一个新的终端并运行:

  1. go run receive_logs.go

当然,要发出日志,请输入:

  1. go run emit_log.go

使用rabbitmqctl list_bindings命令,你可以验证代码是否确实根据需要创建了绑定关系和队列。在运行两个receive_logs.go程序后,你应该看到类似以下内容:

  1. sudo rabbitmqctl list_bindings
  2. # => Listing bindings ...
  3. # => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
  4. # => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
  5. # => ...done.

对结果的解释很简单:数据从logs交换器进入了两个由服务器分配名称的队列。这正是我们想要的。