消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。

消息队列使用场景

异步处理
RabbitMQ教程 - 图1
流量削峰
RabbitMQ教程 - 图2
应用解耦
RabbitMQ教程 - 图3
日志处理
RabbitMQ教程 - 图4

安装

docker

docker run —name rabbitmq -d -p 15672:15672 -p 5672:5672 rabbitmq:management

15672 是 浏览器端口
5672 是 rabbitMQ端口
rabbitmq:management和rabbitmq:latest镜像的区别: manager版本有Web管理界面。

RabbitMQ消息模型

RabbitMQ教程 - 图5

  • Message
    消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
  • Publisher
    消息的生产者,也是一个向交换器发布消息的客户端应用程序。
  • Exchange
    交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  • Binding
    绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
  • Queue
    消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
  • Connection
    网络连接,比如一个TCP连接。
  • Channel
    信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
  • Consumer
    消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
  • Virtual Host
    虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
  • Broker
    表示消息队列服务器实体。

RabbitMQ支持以下五种消息模型,第六种RPC本质上是服务调用,所以不算做服务通信消息模型。

Hello World

RabbitMQ教程 - 图6
P(producer/ publisher):生产者,发送消息的服务
C(consumer):消费者,接收消息的服务
红色区域就是MQ中的Queue,可以把它理解成一个邮箱

  • 首先信件来了不强求必须马上马去拿
  • 其次,它是有最大容量的(受主机和磁盘的限制,是一个缓存区)
  • 允许多个消费者监听同一个队列,争抢消息

Worker模型

RabbitMQ教程 - 图7
Worker模型中也只有一个工作队列。但它是一种竞争消费模式。可以看到同一个队列我们绑定上了多个消费者,消费者争抢着消费消息,这可以有效的避免消息堆积。
比如对于短信微服务集群来说就可以使用这种消息模型,来了请求大家抢着消费掉。
如何实现这种架构:对于上面的HelloWorld这其实就是相同的服务我们启动了多次罢了,自然就是这种架构。

订阅模型

订阅模型借助一个新的概念:Exchange(交换机)实现,不同的订阅模型本质上是根据交换机(Exchange)的类型划分的。
订阅模型有三种

  1. Fanout(广播模型): 将消息发送给绑定给交换机的所有队列(因为他们使用的是同一个RoutingKey)。
  2. Direct(定向): 把消息发送给拥有指定Routing Key (路由键)的队列。
  3. Topic(通配符): 把消息传递给拥有 符合Routing Patten(路由模式)的队列。

订阅之Fanout模型

RabbitMQ教程 - 图8
这个模型的特点就是它在发送消息的时候,并没有指明Rounting Key , 或者说他指定了Routing Key,但是所有的消费者都知道,大家都能接收到消息,就像听广播。

订阅之Direct模型

RabbitMQ教程 - 图9
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
拥有不同的RoutingKey的消费者,会收到来自交换机的不同信息,而不是大家都使用同一个Routing Key 和广播模型区分开来。

订阅之Topic模型

RabbitMQ教程 - 图10
类似于Direct模型。区别是Topic的Routing Key支持通配符。


RabbitMQ Management使用

登录

打开http://ip:15672/
默认账户和密码均为 guest (docker 运行的rabbitmq, 默认guest拥有管理员权限)

若出现不能远程登录,参考如下
https://www.cnblogs.com/ZhuChangwu/p/14093107.html

新增用户

image.png

添加Virtual host

image.png
tag可以指定那一类用户可以使用此virtual host,若需单独指定:

  1. 点击需要添加的用户

image.png

  1. 指定

image.png

查看运行状态

image.png

golang

Hello World

RabbitMQ教程 - 图16
发送方

  1. // 连接rabbirmq broker
  2. // 连接字符串: amqp:// 账户名 : 密码 @ IP : 端口 vhost
  3. // amqp.Dial: 将会设置心跳检测间隔 10秒, handshake时间 30秒
  4. conn, err := amqp.Dial("amqp://guest:guest@192.168.1.12:5672/")
  5. defer conn.Close()
  6. // 创建channel
  7. ch, err := conn.Channel()
  8. defer ch.Close()
  9. // 声明queue,后续往这个队列中发送消息
  10. q, err := ch.QueueDeclare(
  11. "hello", // name
  12. false, // durable
  13. false, // delete when unused
  14. false, // exclusivenei
  15. false, // no-wait
  16. nil, // arguments
  17. )
  18. // 向exchanger发送消息
  19. err = ch.Publish(
  20. "", // exchange
  21. q.Name, // routing key
  22. false, // mandatory 强制
  23. false, // immediate 及时
  24. amqp.Publishing{
  25. ContentType: "text/plain",
  26. Body: []byte(body),
  27. })

连接字符串: amqp:// 账户名 : 密码 @ IP : 端口 vhost
amqp.Dial: 将会设置心跳检测间隔 10秒, handshake时间 30秒

(ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)

声明一个queue或者匹配一个参数相同的queue, queue将会绑定到 direct 模式的empty exchange “”, 并将queue的name作为routing key。生产者与消费者只需要有一方执行QueueDeclare即可。
name参数为空,则返回的q中的name将会返回一个唯一的name

Durable = true, Auto-Deleted=false: 在服务重启及没有消费者或绑定之后依然存活,但可以被消费者通过取消或关闭channel的方式删除。 在服务重启时,Persistent publishing将会保存在queue中。queue只能绑定到持久话exchanges。
Durable = false, Auto-Deleted=true :非持久队列和自动删除队列不会在服务器重启时重新声明,并且会在最后一个消费者被取消或最后一个消费者的通道被关闭后的短时间内被服务器删除。具有此生存期的队列也可以使用queueddelete正常删除。这些持久性队列只能绑定到非持久性exchanges。
Durable = false, Auto-Deleted=false: 在服务器运行期间,不管有多少消费者,非持久性和非自动删除队列都将重新声明。队列只能绑定到非持久交换器。
Durable = true, Auto-Deleted=true: 队列将在服务器重启时恢复,但没有活动消费者的队列将无法存活并被删除。

Exclusive: 队列只能被声明它们的连接访问,并且将在连接关闭时被删除。当试图声明、绑定、使用、清除或删除具有相同名称的队列时,其他连接上的通道将收到一个错误。

noWait: 当noWait为真时,队列将假定是在服务器上声明的。如果满足现有队列的条件,或试图从不同连接修改现有队列,则通道异常将到达。

如果返回err, channel将会被关闭。

(ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error

mandatory为true且没有queue与routing key匹配时,Publishings将会失败。
immediate为true时,若匹配queue中的消费者没有准备好接收数据,Publishings将会失败。

接收方

  1. conn, err := amqp.Dial("amqp://guest:guest@192.168.1.2:5672/test")
  2. failOnError(err, "Failed to connect to RabbitMQ")
  3. defer conn.Close()
  4. ch, err := conn.Channel()
  5. failOnError(err, "Failed to open a channel")
  6. defer ch.Close()
  7. q, err := ch.QueueDeclare(
  8. "hello", // name
  9. false, // durable
  10. false, // delete when unused
  11. false, // exclusive
  12. false, // no-wait
  13. nil, // arguments
  14. )
  15. failOnError(err, "Failed to declare a queue")
  16. // 公平分发
  17. err = ch.Qos(
  18. 1, // prefetch count
  19. 0, // prefetch size
  20. false, // global
  21. )
  22. failOnError(err, "Failed to declare a queue")
  23. msgs, err := ch.Consume(
  24. q.Name, // queue
  25. "", // consumer
  26. true, // auto-ack
  27. false, // exclusive
  28. false, // no-local
  29. false, // no-wait
  30. nil, // args
  31. )
  32. failOnError(err, "Failed to register a consumer")
  33. forever := make(chan bool)
  34. go func() {
  35. for d := range msgs {
  36. log.Printf("Received a message: %s", d.Body)
  37. }
  38. }()
  39. log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
  40. <-forever

(ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)

consumer:用于标识消费者,当调用Channel.Cancel时需要传入。当consumer参数被设置为空字符时,将会生成一个唯一的id,并保存在每一次接收到数据的ConsumerTag中。
autoAck: 当消息者将消息从channel中取出并收到ACK后,RabbitMQ随即将消息给删除。所有在AMQP的交付必须被确认。它期望消费者调用交付。在成功处理交付之后进行确认。如果使用者被取消或通道或连接被关闭,任何未确认的传递将在同一队列的末尾重新排队。autoAck若设置为true, server将会在消息发送到网络之前ack,当消费者不能及时处理时会导致消息丢失。
exclusive:为true时,服务器将确保这是该队列中的唯一使用者。当exclusive为false时,服务器将公平地在多个使用者之间分发交付。
noLocal: RabbitMQ不支持。
no-wait:当noWait为true时,不要等待服务器确认请求并立即开始交付。如果不合理使用,将引发通道异常并关闭通道。
main.go

  1. func main() {
  2. go receive()
  3. for true {
  4. send()
  5. }
  6. }

Worker 模式

RabbitMQ教程 - 图17
send和receive和上面相同,只是在main中多运行一个receive。

  1. func main() {
  2. go receive()
  3. go receive()
  4. for true {
  5. send()
  6. }
  7. }

消息分发策略:默认情况下RabbitMQ后将P生产的消息以round-robin的策略分发给C1、C2。
公平分发:处理并确认前一条消息之前,不要向worker发送新消息。相反,它将把它发送给下一个不忙的worker

Exchange

Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:

  1. direct
    RabbitMQ教程 - 图18
    消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。
  2. fanout
    RabbitMQ教程 - 图19
    每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
  3. topic
    RabbitMQ教程 - 图20
    topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配不多不少一个单词。

Diret模式

send

  1. func send() {
  2. conn, err := amqp.Dial("amqp://guest:guest@192.168.1.2:5672/test")
  3. failOnError(err, "Failed to connect to RabbitMQ")
  4. defer conn.Close()
  5. ch, err := conn.Channel()
  6. failOnError(err, "Failed to open a channel")
  7. defer ch.Close()
  8. err = ch.ExchangeDeclare(
  9. "ex",
  10. "direct", // direct, topic, headers 和 fanout
  11. false,
  12. false,
  13. false,
  14. false,
  15. nil)
  16. failOnError(err, "Failed to declare a Exchange")
  17. body := "Hello World!"
  18. err = ch.Publish(
  19. "ex", // exchange
  20. "routing", // routing key
  21. false, // mandatory
  22. false, // immediate
  23. amqp.Publishing{
  24. ContentType: "text/plain",
  25. Body: []byte(body),
  26. })
  27. failOnError(err, "Failed to publish a message")
  28. //log.Printf(" [x] Sent %s", body)
  29. }

receive

  1. func receive(id string) {
  2. conn, err := amqp.Dial("amqp://guest:guest@192.168.1.2:5672/test")
  3. failOnError(err, "Failed to connect to RabbitMQ")
  4. defer conn.Close()
  5. ch, err := conn.Channel()
  6. failOnError(err, "Failed to open a channel")
  7. defer ch.Close()
  8. q, err := ch.QueueDeclare(
  9. "hello", // name
  10. false, // durable
  11. false, // delete when unused
  12. false, // exclusive
  13. false, // no-wait
  14. nil, // arguments
  15. )
  16. failOnError(err, "Failed to declare a queue")
  17. err = ch.QueueBind(q.Name,
  18. "routing",
  19. "ex",
  20. false,
  21. nil)
  22. failOnError(err, "Failed to open a channel")
  23. msgs, err := ch.Consume(
  24. "hello", // queue
  25. "", // consumer
  26. true, // auto-ack
  27. false, // exclusive
  28. false, // no-local
  29. false, // no-wait
  30. nil, // args
  31. )
  32. failOnError(err, "Failed to register a consumer")
  33. forever := make(chan bool)
  34. go func() {
  35. for d := range msgs {
  36. log.Printf("[%s] Received a message: %s",id, d.Body)
  37. }
  38. }()
  39. log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
  40. <-forever
  41. }

Fanout模型

RabbitMQ教程 - 图21

Topic模型

type为topic、并别routingkey支持正则表达式。

参考

【详细】教程
golang rabbitMQ代码示例
golang rabbitMQ客户端库