为什么选择 RabbitMQ?
RabbitMQ 的高性能是如何实现的?
什么是 AMQP 高级协议?
AMQP 核心概念是什么?
RabbitMQ 整体架构模型是什么样子的?
RabbitMQ 消息是如何流转的?
RabbitMQ 安装与使用
命令行,管控台解析
RabbitMQ 消息生产与消费
RabbitMQ 交换机详解
RabbitMQ 队列、绑定、虚拟主机、消息

1. RabbitMQ 介绍

1.1 什么是 RabbitMQ

RabbitMQ 是使用 Erlang 语言来编写的,基于 AMQP 协议,是跨平台、跨语言的一个消息代理和队列服务器。

1.2 为什么用 RabbitMQ

  • 开源、性能优秀、稳定性保障
  • 提供可靠性消息投递模式(confirm)、返回模式(return)
  • 与 SpringAMQP 整合的非常完美,提供了丰富的 API
    • Spring 将 RabbitMQ 提供的原生 API 进行了再次封装,让它更简单,更易用,扩展性更强
  • RabbitMQ 集群模式非常丰富,并使用广泛和稳定
    • 支持表达式配置,设置集群策略
    • HA 模式
    • 镜像队列模型
  • 保证数据在不丢失的前提做到高可用性、可用性

1.3 RabbitMQ 高性能的原因

  • 使用了 Erlang 语言(面向并发的编程语言),使得 RabbitMQ 在 Broker 之间进行数据交互的性能十分优秀
    • Erlang 使用于交换机领域,进行数据交互的性能十分优秀
    • 有着与原生 Socket 一样的延迟

2. AMQP 介绍

2.1 什么是 AMQP

AMQP(Advanced Message Queuing Protocol 高级消息队列协议)

  • 是一个基于二进制的协议
  • 是一个提供统一消息服务的应用层标准高级消息队列协议
  • 是应用层协议的一个开放标准,为面向消息的中间见设计

2.2 AMQP 协议模型

入门 RabbitMQ 核心概念 - 图1

2.3 AMQP 核心概念

  • Server:又称 Broker,接受客户端的连接,实现 AMQP 实体服务。
  • Connection:连接,应用程序与 Broker 的网络连接。
  • Channel:网络信道,几乎所有操作都在Channel中进行,是进行消息读写的通道。客户端可建立多个Channel,每个代表一个会话任务。
  • Message:消息,服务器和应用程序之间传递的数据。由 Properties 和 Body 组成。Properties 可以对消息进行修饰,如消息的优先级、延迟等高级特性,Body 是消息体的内容。
  • Virtual Host:虚拟地址,用于逻辑隔离,最上层的消息路由(划分具体的服务)。一个 Virtual Host 可以有若干个 Exchange 和 Queue,但同一个 Virtual Host 不能有相同名称 Exchange 或 Queue。
  • Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列,可对消息进行过滤。
  • Binding:Exchange 和 Queue 之间的虚拟连接,binding中可以包含 routing key。
  • Routing key:一个路由规则,虚拟机用它来确定如何路由一个特定消息。(交换路由)
  • Queue:又称为 Message Queue,消息队列,保存消息并将它们转发给消费者。

3. RabbitMQ 的架构与消息流转

3.1 整体架构

入门 RabbitMQ 核心概念 - 图2
入门 RabbitMQ 核心概念 - 图3

  • Broker:就是 RabbitMQ Server,消息队列服务进程,此进程包括两个部分:Exchange 和 Queue。
  • Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
  • Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
  • Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
  • Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

消息发布接受流程:

  • 发送消息
    • 生产者和 Broker 建立 TCP 连接。
    • 生产者和 Broker 建立通道。
    • 生产者通过通道消息发送给 Broker,由 Exchange 将消息进行转发。
    • Exchange 将消息转发到指定的 Queue(队列)
  • 接收消息
    • 消费者和 Broker 建立 TCP 连接 。
    • 消费者和 Broker 建立通道。
    • 消费者监听指定的 Queue(队列)
    • 当有消息到达 Queue 时 Broker 默认将消息推送给消费者。
    • 消费者接收到消息。

3.2 消息流转

入门 RabbitMQ 核心概念 - 图4
生产者生产出 Message 并投递到 Exchange 上。一个 Exchange 可以绑定多个 Message Queue,它根据路由策略(routing key)路由到指定的队列。最后由消费端去监听队列。

4. RabbitMQ 环境安装

RabbitMQ 官网安装指南:https://www.rabbitmq.com/download.html

4.1 Ubuntu 系统下安装

最新版的 RabbitMQ 支持 Ubuntu 16.04 ~20.10。

官方提供了两种安装方法:

4.1.1 安装 Erlang

  • RabbitMQ 是基于 Erlang 的,所以需要先安装 Erlang。
  • RabbitMQ 和 Erlang 安装包版本一定要对应,具体可以查看网关的对应关系说明:RabbitMQ Erlang Version Requirements
    • 本文使用的是目前最新版本:RabbitMQ 3.8.14 ~ Erlang 23.x。
  • 通过 Launchpad 的 Apt Repository 安装 Erlang
    • Distribution 对应的版本号
      • focal for Ubuntu 20.04
      • bionic for Ubuntu 18.04
      • xenial for Ubuntu 16.04
      • bionic for Debian Buster and later versions ```shell

        1. Install Essential Dependencies

        $ sudo apt-get update -y $ sudo apt-get install curl gnupg debian-keyring debian-archive-keyring -y

2. Add Repository Signing Key

Launchpad PPA signing key for apt

$ sudo apt-key adv —keyserver “keyserver.ubuntu.com” —recv-keys “F77F1EDA57EBB1CC”

3. Enable apt HTTPS Transport

$ sudo apt-get install apt-transport-https

4. Add a Source List File

This Launchpad PPA repository provides Erlang packages produced by the RabbitMQ team

$ sudo touch /etc/apt/sources.list.d/rabbitmq.list $ sudo vim /etc/apt/sources.list.d/rabbitmq.list

add following content:

deb http://ppa.launchpad.net/rabbitmq/rabbitmq-erlang/ubuntu bionic main deb-src http://ppa.launchpad.net/rabbitmq/rabbitmq-erlang/ubuntu bionic main

5. Install Erlang Packages

$ sudo apt-get update -y

This is recommended. Metapackages such as erlang and erlang-nox must only be used

with apt version pinning. They do not pin their dependency versions.

$ sudo apt-get install -y erlang-base \ erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \ erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \ erlang-runtime-tools erlang-snmp erlang-ssl \ erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl

  1. <a name="E2nqE"></a>
  2. ### 4.1.2 安装 RabbitMQ with Apt
  3. RabbitMQ Team 在 Package Cloud 的 apt repository 维护了最新版的 RabbitMQ Packages。
  4. <a name="kT98k"></a>
  5. #### 4.1.2.1 Add Repository Signing Key
  6. - 为了使用 PackageCloud 的 apt repository,我们需要添加 apt-key 来验证信任安装包:
  7. ```shell
  8. # import PackageCloud signing key
  9. $ sudo apt-key adv --keyserver "keyserver.ubuntu.com" --recv-keys "F6609E60DC62814E"

4.1.2.2 Add a Source List File

  • Distribution 对应的版本号
    • focal for Ubuntu 20.04
    • bionic for Ubuntu 18.04
    • xenial for Ubuntu 16.04
    • bionic for Debian Buster and later versions
      1. $ sudo vim /etc/apt/sources.list.d/rabbitmq.list
      2. deb https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ bionic main
      3. deb-src https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ bionic main

      4.1.2.3 Install RabbitMQ Package

      1. $ sudo apt-get update -y
      2. $ sudo apt-get install -y rabbitmq-server

4.1.3 使用 RabbitMQ Server

4.1.3.1 Run Server

  1. # 1. 几种启动 server 方式
  2. $ service rabbitmq-server start
  3. $ rabbitmq-server statt &
  4. $ rabbitmqctl start_app
  5. # 验证 server 运行正常 5672 通信端口号
  6. $ sudo lsof -i:5672
  7. COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
  8. beam.smp 36906 rabbitmq 93u IPv6 6887829 0t0 TCP *:amqp (LISTEN)
  9. # 2. 几种停止 server 方式
  10. $ systemctl status rabbitmq-server
  11. $ rabbitmqctl stop_app
  12. $ rabbitmq-server stop
  13. # 3. 管理插件
  14. # 开启管理界面:http://localhost:15672 15672 管控台默认端口号
  15. $ sudo rabbitmq-plugins list
  16. $ sudo rabbitmq-plugins enable rabbitmq_management

4.1.3.2 Configuring RabbitMQ

  • server node 默认配置为以 system user rabbitmq 来运行。如果 node server 的数据或 log 日志存放位置改变了,文件或目录的权限要配置给这个 rabbitmq 用户。
  • 参考:RabbitMQ 配置指南

    4.1.3.3 Log Files and Management

  • 日志文件默认是在 /var/log/rabbitmq

  • 查看日志
    1. $ sudo journalctl --system | grep rabbitmq

4.2 Docker 环境下安装

使用 Docker 运行 RabbitMQ 非常简单,只需要执行一条简单的命令:

  1. $ docker run -d --name rabbitmq -h rabbitmq -p 5672:5672 -v /var/lib/rabbitmq:/var/lib/rabbitmq registry.docker-cn.com/library/rabbitmq:3.7
  • -d : 后台运行容器
  • –name rabbitmq : 将容器的名字设为 rabbitmq
  • -h rabbitmq : 将容器的主机名设为rabbitmq,希望 RabbitMQ 消息数据持久化保存到本地磁盘是需要设置主机名,因为 RabbitMQ 保存数据的目录为主机名
  • -p 5672:5672 : 将容器的 5672 端口映射为本地主机的5672端口,这样可以通过本地的5672端口访问 rabbitmq
  • -v /var/lib/rabbitmq:/var/lib/rabbitmq:将容器的 /var/lib/rabbitmq 目录映射为本地主机的 /var/lib/rabbitmq 目录,这样可以将 RabbitMQ 消息数据持久化保存到本地磁盘,即使 RabbitMQ 容器被删除,数据依然还在。

5. 命令行与管控台

5.1 基础操作

  • rabbitmqctl stop_app:关闭应用
  • rabbitmqctl start_app:启动应用
  • rabbitmqctl status:节点状态
  • rabbitmqctl add_user username password:添加用户
  • rabbitmqctl list_users:列出所有的用户
  • rabbitmqctl delete_user username:删除用户
  • rabbitmqctl clear_permissions -p vhostpath username:清除用户权限
  • rabbitmqctl list_user_permissions username:列出用户权限
  • rabbitmqctl change_password username newpassword:修改密码
  • rabbitmqctl set_permissions -p vhostpath username “.“ “.“ “.*”:设置用户权限
  • rabbitmqctl add_vhost vhostpath:创建虚拟主机
  • rabbitmqctl list_vhosts:列出所有虚拟主机
  • rabbitmqctl list_permissions -p vhostpath:列出虚拟主机上所有权限
  • rabbitmqctl delete_vhost vhostpath:删除虚拟主机
  • rabbitmqctl list_queues:查看所有队列信息
  • rabbitmqctl -p vhostpath purge_queue blue:清除队列里的消息

5.2 高级操作

  • rabbitmqctl reset:移除所有数据,要在 rabbitmqctl stop_app 之后使用
  • rabbitmqctl join_cluster [—ram]:组成集群命令,[—ram/disc] 集群存储模式落在内存或磁盘上
  • rabbitmqctl cluster_status:查看集群状态
  • rabbitmqctl change_cluster_node_type disc:修改集群节点的存储形式(disc | ram)
  • rabbitmqctl forget_cluster_node [—offline]:忘记节点(删除节点)
  • rabbitmqctl rename_cluster_node oldnode1 newnode1 [olenode2] [newnode2…]:修改节点名称

6. QuickStart: 消息生产与消费 Java Demo

入门 RabbitMQ 核心概念 - 图5
代码实现步骤:

  • 获取连接工厂:ConnectionFactory
  • 获取一个连接:Connection
  • 通过连接创建数据通信信道,用于发送和接收消息:Channel
  • 在Broker上需要具体的消息存储队列:Queue
  • 需要生产者和消费者:Producer & Consumer

构建一个 SpringBoot 项目,POM 中添加 RabbitMQ 依赖 amqp-client:

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>3.6.5</version>
  5. </dependency>

创建消息生产者 Producter

  1. public class Procuder {
  2. public static void main(String[] args) throws Exception {
  3. //1 创建一个ConnectionFactory, 并进行配置
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. connectionFactory.setHost("192.168.12.131");
  6. connectionFactory.setPort(5672);
  7. connectionFactory.setVirtualHost("/");
  8. connectionFactory.setHost("192.168.12.131");
  9. connectionFactory.setUsername("shawn");
  10. //2 通过连接工厂创建连接
  11. Connection connection = connectionFactory.newConnection();
  12. //3 通过connection创建一个Channel
  13. Channel channel = connection.createChannel();
  14. // 4. 通过 Channel 发送数据 (exchange, routingKey, props, body)
  15. // 不指定 Exchange 时, 交换机默认是 AMQP default, 此时就看 RoutingKey,
  16. // RoutingKey 要等于队列名才能被路由, 否则消息会被删除
  17. for(int i=0; i < 5; i++){
  18. String msg = "Hello RabbitMQ!";
  19. //1 exchange 2 routingKey
  20. channel.basicPublish("", "test001", null, msg.getBytes());
  21. }
  22. //5 记得要关闭相关的连接
  23. channel.close();
  24. connection.close();
  25. }
  26. }

创建消息消费者 Consumer

  • queue:队列名称
  • durable:持久化,true 即使服务重启也不会被删除
  • exclusive:独占,true 队列只能使用一个连接,连接断开队列删除
  • autoDelete:自动删除,true 脱离了 Exchange(连接断开),即队列没有 Exchange 关联时,自动删除
  • arguments:扩展参数
  • autoAck:是否自动签收(回执)

    1. public class Consumer {
    2. public static void main(String[] args) throws Exception {
    3. //1 创建一个ConnectionFactory, 并进行配置
    4. ConnectionFactory connectionFactory = new ConnectionFactory();
    5. connectionFactory.setHost("192.168.12.131");
    6. connectionFactory.setPort(5672);
    7. connectionFactory.setVirtualHost("/");
    8. connectionFactory.setHost("192.168.12.131");
    9. connectionFactory.setUsername("shawn");
    10. //2 通过连接工厂创建连接
    11. Connection connection = connectionFactory.newConnection();
    12. //3 通过connection创建一个Channel
    13. Channel channel = connection.createChannel();
    14. //4 声明(创建)一个队列 (queue, durable, exclusive, autoDelete, args)
    15. String queueName = "test001";
    16. channel.queueDeclare(queueName, true, false, false, null);
    17. //5 创建消费者
    18. QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    19. //6 设置Channel,监听队列(String queue, boolean autoAck, Consumer callback)
    20. channel.basicConsume(queueName, true, queueingConsumer);
    21. while(true){
    22. //7 获取消息
    23. Delivery delivery = queueingConsumer.nextDelivery();
    24. String msg = new String(delivery.getBody());
    25. System.err.println("消费端: " + msg);
    26. //Envelope envelope = delivery.getEnvelope();
    27. }
    28. }
    29. }

:::info

  • 先运行 Consumer 再运行 Producter,因为 Producter 中不会创建 Exchange 和 queue,都是在 Consumer 中声明创建后监听,等待 Producter 向指定 Exchange 和 RouteKey 投递消息。
  • 不指定 Exchange 时,交换机默认是 AMQP default,此时就看 RoutingKey,RoutingKey 要等于队列名才能被路由,否则消息会被删除。
  • 如果使用默认用户 “guest” / 密码 “guest” 访问 RabbitMQ Server,该密码只能从 localhost 进行访问。如果是跨主机访问,最好自己创建一个账户。 :::

运行结果:

  1. # consumer 运行结果
  2. 消费端: Hello RabbitMQ!
  3. 消费端: Hello RabbitMQ!
  4. 消费端: Hello RabbitMQ!
  5. 消费端: Hello RabbitMQ!
  6. 消费端: Hello RabbitMQ!

7. Exchange 交换机详解

7.1 交换机概念

用于接收消息,并根据路由键转发消息所绑定的队列。

  • 蓝色框:客户端发送消息至交换机,通过路由键路由至指定的队列。
  • 黄色框:交换机和队列通过路由键有一个绑定的关系。
  • 绿色框:消费端通过监听队列来接收消息。

入门 RabbitMQ 核心概念 - 图6

7.2 交换机属性

  • Name:交换机名称
  • Type:交换机类型—— direct、topic、fanout、headers,sharding(此篇不讲)
  • Durability:是否需要持久化,true为持久化
  • Auto Delete:当最后一个绑定到Exchange上的队列删除后,即Exchange上没有队列绑定,自动删除该Exhcange
  • Internal:当前Exchange是否用于RabbitMQ内部使用,大多数使用默认False
  • Arguments:扩展参数,用于扩展AMQP协议定制化使用

7.3 交换机类型

需要声明一个交换机(指定交换机类型),声明一个队列,建立它们的绑定关系(设置 RoutingKey)。

7.3.1 Direct Exchange

直连的方式。所有发送到 Direct Exchange 的消息被转发到 RoutingKey 中指定的 Queue。 :::info Direct 模式可以使用 RabbitMQ 自带的 Exchange: default Exchange,所以不需要将 Exchange 进行任何绑定(binding)操作,消息传递时,RouteKey 必须完全匹配才会被队列接受,否该消息会被抛弃。 ::: 入门 RabbitMQ 核心概念 - 图7
队列绑定时的 RoutingKey 要与生产者中发送时指定的 RoutingKey 一致。

消息消费者:Consumer4DirectExchange

  1. public class Consumer4DirectExchange {
  2. public static void main(String[] args) throws Exception {
  3. ConnectionFactory connectionFactory = new ConnectionFactory() ;
  4. connectionFactory.setHost("192.168.12.131");
  5. connectionFactory.setPort(5672);
  6. connectionFactory.setVirtualHost("/");
  7. connectionFactory.setUsername("shawn");
  8. connectionFactory.setPassword("123");
  9. connectionFactory.setAutomaticRecoveryEnabled(true);
  10. connectionFactory.setNetworkRecoveryInterval(3000);
  11. Connection connection = connectionFactory.newConnection();
  12. Channel channel = connection.createChannel();
  13. //4 声明
  14. String exchangeName = "test_direct_exchange";
  15. String exchangeType = "direct";
  16. String queueName = "test_direct_queue";
  17. String routingKey = "test.direct";
  18. //表示声明了一个交换机
  19. //(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object) arguments)
  20. channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
  21. //表示声明了一个队列
  22. //(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object) args)
  23. channel.queueDeclare(queueName, false, false, false, null);
  24. //建立一个绑定关系:
  25. channel.queueBind(queueName, exchangeName, routingKey);
  26. //durable 是否持久化消息
  27. QueueingConsumer consumer = new QueueingConsumer(channel);
  28. //参数:队列名称、是否自动ACK、Consumer
  29. channel.basicConsume(queueName, true, consumer);
  30. //循环获取消息
  31. while(true){
  32. //获取消息,如果没有消息,这一步将会一直阻塞
  33. Delivery delivery = consumer.nextDelivery();
  34. String msg = new String(delivery.getBody());
  35. System.out.println("收到消息:" + msg);
  36. }
  37. }
  38. }

消息生产者:Producter4DirectExchange

  1. public class Producter4DirectExchange {
  2. public static void main(String[] args) throws Exception {
  3. //1 创建ConnectionFactory
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. connectionFactory.setHost("192.168.12.131");
  6. connectionFactory.setPort(5672);
  7. connectionFactory.setVirtualHost("/");
  8. connectionFactory.setUsername("shawn");
  9. connectionFactory.setPassword("123");
  10. //2 创建Connection
  11. Connection connection = connectionFactory.newConnection();
  12. //3 创建Channel
  13. Channel channel = connection.createChannel();
  14. //4 声明
  15. String exchangeName = "test_direct_exchange";
  16. String routingKey = "test.direct";
  17. //5 发送
  18. String msg = "Hello World RabbitMQ 4 Direct Exchange Message ... ";
  19. //发送消息 (String exchange, String routingKey, BasicProperties props, Bytes[] body)
  20. channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
  21. }
  22. }

7.3.2 Topic Exchange

有一定路由规则的。所有发送到 Topic Exchange 的消息被转发到所有关心 RoutingKey 中的指定 Topic 的 Queue 上。

Exchange 将 RoutingKey 和某个 Topic 进行模糊匹配(可使用通配符:”#” 可多词,”*” 一个词),此时队列需要绑定一个 Topic。
入门 RabbitMQ 核心概念 - 图8
发送消息时指定 topic,routingKey 队列中的 key 可以进行模糊匹配。

消息生产者:Producter4TopicExchange

  1. public class Producter4TopicExchange {
  2. public static void main(String[] args) throws Exception {
  3. //1 创建ConnectionFactory
  4. ...
  5. //2 创建Connection
  6. Connection connection = connectionFactory.newConnection();
  7. //3 创建Channel
  8. Channel channel = connection.createChannel();
  9. //4 声明
  10. String exchangeName = "test_topic_exchange";
  11. String routingKey1 = "user.save";
  12. String routingKey2 = "user.update";
  13. String routingKey3 = "user.delete.abc";
  14. //5 发送
  15. String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
  16. //发送消息 (String exchange, String routingKey, BasicProperties props, Bytes[] body)
  17. channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
  18. channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
  19. channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
  20. channel.close();
  21. connection.close();
  22. }
  23. }

消息消费者:Consumer4TopicExchange

  1. public class Consumer4TopicExchange {
  2. public static void main(String[] args) throws Exception {
  3. ConnectionFactory connectionFactory = new ConnectionFactory() ;
  4. ...
  5. Channel channel = connection.createChannel();
  6. //4 声明
  7. String exchangeName = "test_topic_exchange";
  8. String exchangeType = "topic";
  9. String queueName = "test_topic_queue";
  10. //String routingKey = "user.*";
  11. String routingKey = "user.#";
  12. // 1 声明交换机
  13. //(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object)
  14. channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
  15. // 2 声明队列
  16. //(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object) args)
  17. channel.queueDeclare(queueName, false, false, false, null);
  18. // 3 建立交换机和队列的绑定关系:
  19. channel.queueBind(queueName, exchangeName, routingKey);
  20. //durable 是否持久化消息
  21. QueueingConsumer consumer = new QueueingConsumer(channel);
  22. //参数:队列名称、是否自动ACK、Consumer
  23. channel.basicConsume(queueName, true, consumer);
  24. //循环获取消息
  25. while(true){
  26. //获取消息,如果没有消息,这一步将会一直阻塞
  27. Delivery delivery = consumer.nextDelivery();
  28. String msg = new String(delivery.getBody());
  29. System.out.println("收到消息:" + msg);
  30. }
  31. }
  32. }

因为使用了模糊匹配的 “user.#”,可以匹配到发送的三条消息。因此可以收到三条消息。

7.3.3 Fanout Exchange

不处理路由键。只需简单将队列绑定到交换上。


发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
Fanout Exchange 转发消息速度最快。
入门 RabbitMQ 核心概念 - 图9
不需要 RoutingKey,只要交换机和队列有一个绑定关系,消息就可以转发。

消息消费者:Consumer4FanoutExchange

  1. public class Consumer4FanoutExchange {
  2. public static void main(String[] args) throws Exception {
  3. ...
  4. //4 声明
  5. String exchangeName = "test_fanout_exchange";
  6. String exchangeType = "fanout";
  7. String queueName = "test_fanout_queue";
  8. String routingKey = ""; //不设置路由键
  9. channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
  10. channel.queueDeclare(queueName, false, false, false, null);
  11. channel.queueBind(queueName, exchangeName, routingKey);
  12. //durable 是否持久化消息
  13. QueueingConsumer consumer = new QueueingConsumer(channel);
  14. //参数:队列名称、是否自动ACK、Consumer
  15. channel.basicConsume(queueName, true, consumer);
  16. //循环获取消息
  17. while(true){
  18. //获取消息,如果没有消息,这一步将会一直阻塞
  19. Delivery delivery = consumer.nextDelivery();
  20. String msg = new String(delivery.getBody());
  21. System.out.println("收到消息:" + msg);
  22. }
  23. }
  24. }

消息生产者:Producter4FanoutExchange

  1. public class Producter4FanoutExchange {
  2. public static void main(String[] args) throws Exception {
  3. ...
  4. //2 创建Connection
  5. Connection connection = connectionFactory.newConnection();
  6. //3 创建Channel
  7. Channel channel = connection.createChannel();
  8. //4 声明
  9. String exchangeName = "test_fanout_exchange";
  10. //5 发送
  11. for(int i = 0; i < 10; i ++) {
  12. String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
  13. channel.basicPublish(exchangeName, "", null , msg.getBytes());
  14. }
  15. channel.close();
  16. connection.close();
  17. }
  18. }

7.3.4 Headers Exchange(不常用)

Headers 模式取消 Routingkey,使用 headers 中的 key/value(键值对)匹配队列。

8. 绑定、队列、消息、虚拟机详解

8.1 Binding 绑定

Exchange 和 Exchange、Exchange 和 Queue 之间的连接关系。
Binding 中可以包含 RoutingKey 或者参数。

8.2 Queue 消息队列

消息队列,实际存储消息数据。
属性:

  • Durability:是否持久化,Durable(是),Transient(否)
  • Auto delete:如选 yes,代表当最后一个监听被移除之后,该 Queue 会自动被删除

8.3 Message 消息

服务器和应用程序之间传送的数据;
本质上就是一段数据,由 Properties 和 Payload (Body)组成;
常用属性:

  • delivery mode
  • header(自定义属性)

其他属性(已经定义好的)

  • content_type、content_encoding、priority
  • correlation_id、reply_to、expiration、message_id
  • timestamp、type、user_id、app_id、cluster_id

消息生产者:

  1. public class Producter {
  2. public static void main(String[] args) throws Exception {
  3. ...
  4. Map<String, Object> headers = new HashMap<>();
  5. headers.put("my1", "111");
  6. headers.put("my2", "222");
  7. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
  8. .deliveryMode(2) //持久化投递消息
  9. .contentEncoding("UTF-8")
  10. .expiration("10000")
  11. .headers(headers)
  12. .build();
  13. //4 通过Channel发送数据
  14. for(int i=0; i < 5; i++){
  15. String msg = "Hello RabbitMQ!";
  16. //1 exchange 2 routingKey
  17. channel.basicPublish("", "test001", properties, msg.getBytes());
  18. }
  19. //5 记得要关闭相关的连接
  20. channel.close();
  21. connection.close();
  22. }
  23. }

消息消费者:

  1. public class Consumer {
  2. public static void main(String[] args) throws Exception {
  3. ...
  4. while(true){
  5. //7 获取消息
  6. Delivery delivery = queueingConsumer.nextDelivery();
  7. String msg = new String(delivery.getBody());
  8. System.err.println("消费端: " + msg);
  9. Map<String, Object> headers = delivery.getProperties().getHeaders();
  10. System.err.println("headers get my1 value: " + headers.get("my1"));
  11. //Envelope envelope = delivery.getEnvelope();
  12. }
  13. }
  14. }

8.4 Virtual host 虚拟主机

虚拟地址,用于进行逻辑隔离,最上层的消息路由;
一个 Virtual host 里面可以由若干个 Exchange 和 Queue;
同一个 Virtual host 里面不能有相同名称的 Exchange 或 Queue;