RabbitMQ 七种队列模式应用场景案例分析

一、消息队列

消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
消息队列的核心作用:削峰、解耦、异步。

二、RabbitMQ 介绍

RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层来保存这个数据。
AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种客户端,如:Python、C、Ruby、.NET、Java、JMS、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

三、RabbitMQ 技术亮点

1、可靠性

RabbitMQ 提供了多种技术可以让你在性能和可靠性之间进行权衡。这些技术包括持久性机制、投递确认、发布者证实和高可用性机制。

2、灵活的路由

消息在到达队列前是通过交换机进行路由的。RabbitMQ 为典型的路由逻辑提供了多种内置交换机类型。如果你有更复杂的路由需求,可以将这些交换机组合起来使用,你甚至可以实现自己的交换机类型,并且当做 RabbitMQ 的插件来使用。

3、集群

在相同局域网中的多个 RabbitMQ 服务器可以聚合在一起,作为一个独立的逻辑代理来使用。

4、联合

对于服务器来说,它比集群需要更多的松散和非可靠链接。为此 RabbitMQ 提供了联合模型。

5、高可用的队列

在同一个集群里,队列可以被镜像到多个机器中,以确保当其中某些硬件出现故障后,你的消息仍然安全。

6、多协议

RabbitMQ 支持多种消息协议的消息传递。

7、广泛的客户端

只要是你能想到的编程语言几乎都有与其相适配的 RabbitMQ 客户端。

8、可视化管理工具

RabbitMQ 附带了一个易于使用的可视化管理工具,它可以帮助你监控消息代理的每一个环节。

9、追踪

如果你的消息系统有异常行为,RabbitMQ 还提供了追踪的支持,让你能够发现问题所在。

10、插件系统

RabbitMQ 附带了各种各样的插件来对自己进行扩展。你甚至也可以写自己的插件来使用。

四、RabbitMQ 概念模型

1、消息模型

所有 MQ 产品从模型抽象上来说都是一样的过程:
消费者(consumer)订阅某个队列;生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。消费者和生产者是消息接收和消息发送概念的体现,而不是客户端和服务器端。
RabbitMQ - 图1

2、RabbitMQ 模型

RabbitMQ 在常规的消息模型上,多做了一层抽象,在发消息者和队列之间,加入了交换器 (Exchange)。这样发消息者和队列就没有直接联系,转而变成发消息者把消息给交换器,交换器根据调度策略再把消息再给队列。
RabbitMQ - 图2

1、Message

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括 routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

2、Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

3、虚拟主机

一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ 当中,用户只能在虚拟主机的粒度进行权限控制。因此,如果需要禁止 A 组访问 B 组的交换机/队列/绑定,必须为 A 和 B 分别创建一个虚拟主机。每一个 RabbitMQ 服务器都有一个默认的虚拟主机“/”。

4、交换机(Exchange)

交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct、topic、Headers 和 Fanout。

  • Direct Exchange

Direct Exchange 是 RabbitMQ 默认的交换机模式,也是最简单的模式,根据 key 全文匹配去寻找队列。Direct 类型的行为是”先匹配,再投送”。 即在绑定时设定一个 routing_key,消息的 routing_key 匹配时,才会被交换器投送到绑定的队列中去。

  • Fanout Exchange

Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列如果配置了 routing_key 会被忽略。fanout 类型转发消息是最快的。

  • Headers Exchange

设置 header attribute 参数类型的交换机。headers 也是根据规则匹配,相较于 direct 和 topic 固定地使用 routing_key,headers 则是一个自定义匹配规则的类型。
在队列与交换器绑定时,会设定一组键值对规则,消息中也包括一组键值对( headers 属性),当这些键值对有一对,或全部匹配时,消息被投送到对应队列。

  • Topic Exchange

RabbitMQ - 图3
按规则转发消息(最灵活),转发消息主要是根据通配符。在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。
在这种交换机模式下:

  • 路由键必须是一串字符,用句号(.) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等。
  • 路由模式必须包含一个 星号(),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements..b.,那么就只能匹配路由键是这样子的:第一个单词是 agreements,第四个单词是 b。 井号(#)就表示相当于一个或者多个单词,例如一个匹配模式是agreements.eu.berlin.#,那么,以agreements.eu.berlin开头的路由键都是可以的。

topic 和 direct 类似,只是匹配上支持了”模式”,在”点分”的 routing_key 形式中,可以使用两个通配符:

    • 表示一个词。
  • 表示零个或多个词。

    五、SpringBoot 集成 RabbitMQ

    1、配置 POM

    1. <dependency>
    2. <groupId>org.springframework.boot</groupId>
    3. <artifactId>spring-boot-starter-amqp</artifactId>
    4. </dependency>

    2、配置 RabbitMQ 连接信息

    1. # 配置rabbitmq的安装地址、端口以及账户信息
    2. spring.application.name=spring-boot-rabbitmq
    3. spring.rabbitmq.host=127.0.0.1
    4. spring.rabbitmq.port=5672
    5. #spring.rabbitmq.username=admin
    6. #spring.rabbitmq.password=123456

    3、定义 Queue

    1. @Configuration
    2. public class RabbitConfig {
    3. @Bean
    4. public Queue helloQueue() {
    5. return new Queue("hello");
    6. }
    7. }

    4、发布消息

    1. @Component
    2. public class HelloSender {
    3. @Autowired
    4. private AmqpTemplate rabbitTemplate;
    5. public void send() {
    6. String context = "Hello " + new Date();
    7. System.out.println("Sender : " + context);
    8. this.rabbitTemplate.convertAndSend("hello", context);
    9. }
    10. }

    5、消费消息

    1. @Component
    2. // 队列名要去要订阅的队列的名一致。
    3. @RabbitListener(queues = "hello")
    4. public class HelloReceiver {
    5. @RabbitHandler
    6. public void process(String hello) {
    7. System.out.println("Receiver : " + hello);
    8. }
    9. }

    六、理解消息通信

    1、信道

    信道是建立在 TCP 连接内的虚拟连接。AMQP 的命令都是通过信道发送出去的。每条信道都会被指定一个唯一 ID。
    RabbitMQ - 图4

安装与管理

RabbitMQ 依赖于 Erlang 和 socat 环境,安装 RabbitMQ 前需要先安装 Erlang 和 socat 环境。

安装 RabbitMQ 提供的 Erlang 环境

https://github.com/rabbitmq/erlang-rpm
签名:

  1. ## primary RabbitMQ signing key
  2. rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
  3. ## modern Erlang repository
  4. rpm --import https://packagecloud.io/rabbitmq/erlang/gpgkey

新建 /etc/yum.repos.d/rabbitmq_erlang.repo 文件:

# In /etc/yum.repos.d/rabbitmq_erlang.repo
[rabbitmq_erlang]
name=rabbitmq_erlang
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/$basearch
repo_gpgcheck=1
gpgcheck=1
enabled=1
# PackageCloud's repository key and RabbitMQ package signing key
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
       https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300

[rabbitmq_erlang-source]
name=rabbitmq_erlang-source
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/SRPMS
repo_gpgcheck=1
gpgcheck=0
enabled=1
# PackageCloud's repository key and RabbitMQ package signing key
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
       https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300

安装 socat

yum install socat -y

安装 RabbitMQ

提前下载好 RabbitMQ 安装包,并上传。

rpm -ivh rabbitmq-server-3.8.17-1.el7.noarch.rpm

开启管理界面

rabbitmq-plugins enable rabbitmq_management

用户管理

创建账号:

rabbitmqctl add_user admin 123

设置用户角色:

rabbitmqctl set_user_tags admin administrator

设置用户权限

# set _permissions [-p <vhostpath>]<user> <conf> <write> <read>
# 用户 admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限
rabbitmqctl set_permissions -p " /" admin ".*"".*"".*"

查看当前用户和角色

rabbitmqctl list_users