1. RabbitMQ

1.1. 概念

1.1.1. AMQP协议

高级消息队列协议(Advanced Message Queuing Protocol),是一种二进制应用层协议,用于应对广泛的面向消息应用程序的支持。协议提供了消息流控制,保证的一个消息对象的传递过程,如至多一次、保证多次、仅有一次等,和基于SASL和TLS的身份验证和消息加密。实现AMQP协议的中间件有: RabbitMQ,ActiveMQ,Qpid,JORAM等。

1.1.2. RabbitMQ相关概念

  • Broker 消息传输服务,等同于RabbitMQ Server本身
  • Virtual Host 虚拟主机,用于资源隔离,不同的虚拟主机下的资源相互隔离
  • Queue 队列,消息存储器
  • Exchange 交换机,接收客户端发来的消息,并根据路由方式将消息投递到队列中。交换机存在不同类型。
  • Message Routing Key 消息在Exchange中路由规则的索引,用于指定当前消息采用的路由规则
  • Bind Routing Key 指定消息在Exchange与Queue绑定关系的Routing Key,也被称为Binding Key

    1.1.3. RabbitMQ使用场景

  • 项目解耦 在微服务场景中非常常见,一个大的单体服务拆成若干小服务,不同服务之间进行数据交换可以通过RabbitMQ

  • 流量削峰 在突如其来的大并发场景中(常见于商品秒杀),使用RabbitMQ进行缓存,避免流量直接打垮后端服务
  • 异步处理 当前服务将任务分发给其它服务,由其它服务处理后上报结果到MQ后被消费

    1.1.4. 安装

  • 个人学习环境可以直接启动一个 RabbitMQ 容器,仓库

  • 生产环境中需要安装一个RabbitMQ 集群,并配置高可用策略,运维指南:集群安装镜像队列Metrics备份与恢复AnsibleRole
  • 测试环境中可以使用Chart 按照RabbitMQ集群,安装方式:Chart

    1.2. RabbitMQ入门操作

  • 创建 vhost (一般不使用默认的 / )

  • 创建 Queue
  • 创建 Exchange
  • 绑定 Exchange 与 Queue
  • 发送消息
  • 接收消息

    1.2.1. 资源创建

    1.2.1.1. 创建vhost与queue

    image.png
    创建四个队列方便演示(qn):
    image.png
    image.png

    1.2.1.2. 创建交换机

    这里创建四种不同类型的交换机(type-test),方便演示
    image.png
    image.png

    1.2.2. 测试交换机

    RabbitMQ中,生产者是没有办法将消息直接发送到目标队列的,甚至都不知道目标队列名称,也不知道会被哪些服务消费。
    生产者只需要正确投递消息给broker的交换机,然后RabbitMQ内部进行消息路由,分发消息到目标队列,消费者从目标队列获取消息即可!
    默认交换机:当发布消息时,Exachage为空,将采用默认交换机,该交换机自动为direct类型。创建队列时,会以队列名称为key,自动绑定到默认交换机上!
    image.png

    1.2.2.1. 测试fanout类型交换机

    这类交换机在被称为广播模式,即所有绑定到该交换机上的queue都会收到消息,如多个服务同时监听某个应用上报的信息,那么采用广播比较合适,性能也很优异。
    image.png
    image.png
    image.png
    消费队列中的消息,q1到q4全部消费
    image.png

    1.2.2.2. 测试direct类型

    指定routing key绑定不同的队列,这种在生产环境中是最常见的!
    image.png
    image.png
    image.png
    image.png

    1.2.2.3. 测试topic类型

    topic 交换机就是在direct基础上增加了模糊匹配功能,性能相对于derict有所下降,但是模式匹配功能增强了扩展能力。
    其中 * 表示一个单词, # 表示零个或者多个单词。通常topic使用具体单词+ . +通配符,如 k8s.*.* #.ddn.com
    image.png
    image.png
    image.png

    1.2.2.4. 测试header类型

    这种类型根据发送消息的 Headers 来进行消息分发,性能较差,使用的较少。此处不再演示!

    1.3. golang操作RabbitMQ

    1.3.1. 入门

    当前包中,queue声明、exchange声明、queue和exchage绑定都是幂等操作,即重复执行不会报错。生产环境中,可能不允许程序申请这些信息,程序要使用的queue、exchange、Key 很可能是通过环境变量或者配置中心注入的。 ```go package main

import ( “github.com/streadway/amqp” “go_learn/logger” “time” )

func main() { // 打开连接 conn, err := amqp.Dial(“amqp://admin:123456@127.0.0.1:5672/golang”) if err != nil { logger.Errorf(“dail to mq server failed, err:%s”) return } defer func() { _= conn.Close()}()

  1. // 声明通道, 一个TCP连接可以申请多个通道,不同的协程可以使用不同的通道来增强性能
  2. channel, err := conn.Channel()
  3. if err != nil {
  4. logger.Errorf("open channel failed, err:%s")
  5. return
  6. }
  7. defer func() { _= channel.Close()}()
  8. // 队列声明
  9. /* (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)
  10. name: 声明的队列名称,如果不填为返回一个随机名称
  11. durable: 是否持久化队列,这里针对的时队列信息,而不是消息。消息的持久化是需要在发布消息时指定
  12. autoDelete: 自动删除,当没有消费者时,该队列会被删除。并不是立刻被删除
  13. exclusive: 独享队列,当连接端开就会被删除,并且只能由当前连接使用
  14. noWait: 如果这个有满足该队列条件的,或者其它连接在修改当前的队列则立刻返回错误
  15. */
  16. queue, err := channel.QueueDeclare("queue-1", true, false, false, false, nil)
  17. if err != nil {
  18. logger.Errorf("declare queue failed, err:%s")
  19. return
  20. }
  21. // 交换机声明
  22. if channel.ExchangeDeclare("direct-test", "direct", true, false, false, false, nil) != nil {
  23. logger.Errorf("declare exchange failed, err:%s")
  24. return
  25. }
  26. // queue与交换机绑定
  27. /* (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error
  28. name: queue名称
  29. key: routine key
  30. exchange: exchange名称
  31. */
  32. if channel.QueueBind(queue.Name, "key-1", "direct-test", false, nil) != nil {
  33. logger.Errorf("bind queue to exchange failed, err:%s")
  34. return
  35. }
  36. go func() {
  37. for {
  38. // 定义消息
  39. /*
  40. Headers: header
  41. ContentType: MIME 类型
  42. DeliveryMode: 是否持久化,2表示持久化,0或者1表示非持久化
  43. Body: 消息体
  44. */
  45. msg := amqp.Publishing{
  46. ContentType: "text/plain",
  47. DeliveryMode: amqp.Persistent,
  48. Body: []byte("Hello world,"+ time.Now().Format("2006-01-02 15:04:05")),
  49. }
  50. // 发布消息
  51. /* (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing)
  52. exchange: 交换机名称,空表示默认交换机
  53. key: 消息发布时指定的 key
  54. mandatory: 为true时,如果当前消息无法路由到任何queue中,消息则返回给发布者
  55. immediate: 为true时,如果当消息路由的目标queued都没有消费者,则返回消息给发布者
  56. */
  57. err = channel.Publish("direct-test", "key-1", false,false, msg)
  58. if err != nil {
  59. logger.Errorf("publish message failed, err:%s")
  60. return
  61. }
  62. time.Sleep(time.Second)
  63. }
  64. }()
  65. go func() {
  66. // 定义消费者
  67. /* (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
  68. queue: 队列名称
  69. consumer: 消费者名称,为空则自动指定名称
  70. autoAck: 是否自动确认消息
  71. exclusive: 独享当前队列,其它消费者不能消费
  72. noLocal: rabbitmq不支持
  73. noWait: 要求立即消费,如果为true,且队列为空,会导致报错
  74. */
  75. deliveryChan, err := channel.Consume(queue.Name, "", false, false, false, false, nil)
  76. if err != nil {
  77. logger.Errorf("create consumer failed, err:%s")
  78. return
  79. }
  80. for delivery := range deliveryChan {
  81. logger.Infof("msg:%s", string(delivery.Body))
  82. _ = delivery.Ack(false) // true 表示确认所有未ack的消息,false表示仅确认当前这一条消息
  83. }
  84. }()
  85. select {}

}

  1. ```
  2. [root@duduniao easy]# go run main.go
  3. 2021-01-17 16:53:59.739|msg:Hello world,2021-01-17 16:53:59
  4. 2021-01-17 16:54:00.74|msg:Hello world,2021-01-17 16:54:00
  5. 2021-01-17 16:54:01.74|msg:Hello world,2021-01-17 16:54:01
  6. 2021-01-17 16:54:02.741|msg:Hello world,2021-01-17 16:54:02