topic交换器(主题交换器)

发送到topic交换器的消息不能具有随意的routing_key——它必须是单词列表,以点分隔。这些词可以是任何东西,但通常它们指定与消息相关的某些功能。一些有效的routing_key示例:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。routing_key中可以包含任意多个单词,最多255个字节。
绑定键也必须采用相同的形式。topic交换器背后的逻辑类似于direct交换器——用特定路由键发送的消息将传递到所有匹配绑定键绑定的队列。但是,绑定键有两个重要的特殊情况:

  • *(星号)可以代替一个单词。
  • (井号)可以替代零个或多个单词。

通过下面这个示例可以很容易看明白这一点:
Topics - 图1
在这个例子中,我们将发送一些都是描述动物的信息。将使用包含三个词(两个点)的路由密钥发送消息。路由键中的第一个单词将描述速度,第二个是颜色,第三个是种类:“<speed>.<colour>.<species>”。
我们创建了三个绑定关系:Q1与绑定键“*.orange.*”绑定,Q2与“*.*.rabbit”和“lazy.#”绑定。
这些绑定可以总结为:

  • Q1对所有橙色动物都感兴趣。
  • Q2想接收有关兔子(rabbit)的一切消息,以及有关懒惰(lazy)动物的一切消息。

路由键设置为“quick.orange.rabbit”的消息将传递到两个队列。消息“lazy.orange.elephant”也将发送给他们两个。另一方面,“quick.orange.fox”将仅进入第一个队列,而“lazy.brown.fox”将仅进入第二个队列。即使“lazy.pink.rabbit”与两个绑定匹配(匹配Q2的两个绑定),也只会传递到第二个队列一次。 “quick.brown.fox”与任何绑定都不匹配,因此将被丢弃。
如果我们打破约定并发送一个或四个单词的消息,例如“orange”或“quick.orange.male.rabbit”,会发生什么?好吧,这些消息将不匹配任何绑定,并且将会丢失。
另外,“lazy.orange.male.rabbit”即使有四个单词,也将匹配最后一个绑定,并将其传送到第二个队列。

topic交换器 topic交换器功能强大,可以像其他交换器一样运行。 当队列用“#”(井号)绑定键绑定时,它将接收所有消息,而与路由键无关,就像在fanout交换器中一样。 当在绑定中不使用特殊字符“*”(星号)和“#”(井号)时,topic交换器的行为就像direct交换器一样。

完整示例

我们将在日志记录系统中使用topic交换器。我们将从一个可行的假设开始,即日志的路由键将包含两个词:“<facility>.<severity>”。
该代码与上一教程中的代码几乎相同。
emit_log_topic.go的代码:

  1. package main
  2. import (
  3. "log"
  4. "os"
  5. "strings"
  6. "github.com/streadway/amqp"
  7. )
  8. func failOnError(err error, msg string) {
  9. if err != nil {
  10. log.Fatalf("%s: %s", msg, err)
  11. }
  12. }
  13. func main() {
  14. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  15. failOnError(err, "Failed to connect to RabbitMQ")
  16. defer conn.Close()
  17. ch, err := conn.Channel()
  18. failOnError(err, "Failed to open a channel")
  19. defer ch.Close()
  20. err = ch.ExchangeDeclare(
  21. "logs_topic", // name
  22. "topic", // type
  23. true, // durable
  24. false, // auto-deleted
  25. false, // internal
  26. false, // no-wait
  27. nil, // arguments
  28. )
  29. failOnError(err, "Failed to declare an exchange")
  30. body := bodyFrom(os.Args)
  31. err = ch.Publish(
  32. "logs_topic", // exchange
  33. severityFrom(os.Args), // routing key
  34. false, // mandatory
  35. false, // immediate
  36. amqp.Publishing{
  37. ContentType: "text/plain",
  38. Body: []byte(body),
  39. })
  40. failOnError(err, "Failed to publish a message")
  41. log.Printf(" [x] Sent %s", body)
  42. }
  43. func bodyFrom(args []string) string {
  44. var s string
  45. if (len(args) < 3) || os.Args[2] == "" {
  46. s = "hello"
  47. } else {
  48. s = strings.Join(args[2:], " ")
  49. }
  50. return s
  51. }
  52. func severityFrom(args []string) string {
  53. var s string
  54. if (len(args) < 2) || os.Args[1] == "" {
  55. s = "anonymous.info"
  56. } else {
  57. s = os.Args[1]
  58. }
  59. return s
  60. }

receive_logs_topic.go的代码:

  1. package main
  2. import (
  3. "log"
  4. "os"
  5. "github.com/streadway/amqp"
  6. )
  7. func failOnError(err error, msg string) {
  8. if err != nil {
  9. log.Fatalf("%s: %s", msg, err)
  10. }
  11. }
  12. func main() {
  13. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  14. failOnError(err, "Failed to connect to RabbitMQ")
  15. defer conn.Close()
  16. ch, err := conn.Channel()
  17. failOnError(err, "Failed to open a channel")
  18. defer ch.Close()
  19. err = ch.ExchangeDeclare(
  20. "logs_topic", // name
  21. "topic", // type
  22. true, // durable
  23. false, // auto-deleted
  24. false, // internal
  25. false, // no-wait
  26. nil, // arguments
  27. )
  28. failOnError(err, "Failed to declare an exchange")
  29. q, err := ch.QueueDeclare(
  30. "", // name
  31. false, // durable
  32. false, // delete when unused
  33. true, // exclusive
  34. false, // no-wait
  35. nil, // arguments
  36. )
  37. failOnError(err, "Failed to declare a queue")
  38. if len(os.Args) < 2 {
  39. log.Printf("Usage: %s [binding_key]...", os.Args[0])
  40. os.Exit(0)
  41. }
  42. // 绑定topic
  43. for _, s := range os.Args[1:] {
  44. log.Printf("Binding queue %s to exchange %s with routing key %s",
  45. q.Name, "logs_topic", s)
  46. err = ch.QueueBind(
  47. q.Name, // queue name
  48. s, // routing key
  49. "logs_topic", // exchange
  50. false,
  51. nil)
  52. failOnError(err, "Failed to bind a queue")
  53. }
  54. msgs, err := ch.Consume(
  55. q.Name, // queue
  56. "", // consumer
  57. true, // auto ack
  58. false, // exclusive
  59. false, // no local
  60. false, // no wait
  61. nil, // args
  62. )
  63. failOnError(err, "Failed to register a consumer")
  64. forever := make(chan bool)
  65. go func() {
  66. for d := range msgs {
  67. log.Printf(" [x] %s", d.Body)
  68. }
  69. }()
  70. log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
  71. <-forever
  72. }

想要接收所有的日志:

  1. go run receive_logs_topic.go "#"

要从“kern”接收所有日志:

  1. go run receive_logs_topic.go "kern.*"

或者,如果你只想接收“critical”日志:

  1. go run receive_logs_topic.go "*.critical"

你可以创建多个绑定:

  1. go run receive_logs_topic.go "kern.*" "*.critical"

并发出带有路由键“kern.critical”的日志:

  1. go run emit_log_topic.go "kern.critical" "A critical kernel error"

你可以自己尝试玩一下这个程序。请注意,代码没有对路由键或绑定键进行任何假设,你可能希望使用两个以上的路由键参数。
(关于emit_log_topic.goreceive_logs_topic.go的完整源代码)
接下来,我们将在教程6中了解如何将往返消息用作远程过程调用。