什么是RabbitMQ
MQ(Message Queue) 消息队列
- 消息队列中间件,是分布式系统中的重要组件
- 主要解决:异步处理、应用解耦、流量消峰等问题
- 从而实现高性能、高可用、可伸缩和最终一致性的架构
- 使用较多的消息队列产品:RabbitMQ、RocketMQ、Kafka、ActiveMQ
异步处理
例如:用户注册后,需要发送验证邮箱和手机验证码;将注册信息写入数据库,发送验证邮件,发送手机,三个步骤全部全部完成后,返回给客户端。
应用解耦
场景:订单系统需要通知库存系统。如果库存系统出现异常,则订单调用库存失败,导致下单失败,由于:订单系统和库存系统耦合度太高
- 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户,下单成功;
- 库存系统:订阅下单的消息,获取下单信息,库存系统根据下单信息,在进行库存操作;
- 假如:下单的时候,库存系统不能正常运行,也不会影响下单,因为下单后,订单系统写入消息队列就不再关心其他后续操作了,实现了订单系统和库存系统的应用解耦
- 消息队列是典型的:生产者消费者模型
- 消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的入侵,这样就实现了生产者和消费者的解耦
流量削峰
场景:秒杀、抢购等业务,针对高并发的场景。
因为流浪过大,暴增会导致应用挂掉,为解决这个问题,在前端加入消息队列
用户的请求,在服务器接收后,首先写入消息队列,如果超过队列的长度,就抛弃,甩一个秒杀结束的页面。
也就是说秒杀成功的就是进入队列的用户。
背景知识
- AMQP
(Advanced Message Queuing Protocol)一个提供统一消息服务的应用层标准高级消息队列协议.
协议:数据在传输的过程中必须要遵守的规则
基于此协议的客户端可以与消息中间件传递消息
并不受产品、开发语言等条件的限制
- JMS
Java Message Server,Java消息服务应用程序接口,一种规范和JDBC担任的角色类似。
是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间;或分布式系统中发送消息,进行异步通信
- 二者的区别:
JMS是定义了统一接口,统一消息操作;AMQP通过协议统一数据交互格式。
JMS必须是Java语言;AMQP知识协议和语言无关
- Erlang语言
Erlang 是一种通用的面向并发的编程语言,目的是创造一种可以应对大规模并发活动的编程语言和运行环境。专门为通信应用设计的。RabbitMQ就是由Erlang编写的。
为什么选择RabbitMQ
RabbtiMQ:
- Erlang开发,AMQP的最佳搭档,安装部署简单,上手门槛低。
- 它是企业级消息队列,经过大量实践考验的高可靠,一些一线大厂也都在使用。
- 有强大的WEB管理页面
- 强大的社区支持,为技术进步提供动力
- 支持消息持久化、支持消息确认机制、灵活的任务分发机制等,支持功能丰富
- 集群扩展很容易,并且可以通过增加节点实现成倍的性能提升
总结:如果你希望使用一个可靠性高、功能强大、易于管理的消息队列系统那么就选择RabbitMQ,如果你想用一个性能高,但偶尔丢点数据可以使用kafka和zeroMQ,kafka和zeroMQ的性能爆表,绝对可以压RabbitMQ一头!.
RabbitMQ各组件功能
- Broker:消息队列服务器实体
- Virtual Host:虚拟主机
- 表示一批交换机、消息队列和相关对象,形成的整体
- 虚拟主机是共享相同的身份认证和加密环境的独立服务器域
- 每个vhost本质就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换机、绑定和权限机制
- vhost是AQMP概念的基础,RabbitMQ默认的vhost是/,必须在链接时指定
- Exchange:交换器(路由)
- 用来接收生产者发送的消息并将这些消息路由给服务器中的队列
- Queue:消息队列
- 用来保存消息直到发送给消费者
- 它是消息的容器,也是消息的终点
- 一个消息可以投入一个或多个队列
- 消息一直在队列里面,等待消费者连接到这个队列将其取走
- Banding:绑定,用于消息队列和交换机之间的关联
- Channel:通道(信道)
- 多路复用连接中的一条独立的双向数据流通道
- 信道是建立在真实的TCP连接内的虚拟链接
- AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,都是通过信道完成的
- 因为对于操作系统来说,建立和销毁TCP连接都是非常昂贵的开销,所以引入了信道的概念,用来复用TCP连接
- Connection:网络连接,比如一个TCP连接
- Publisher:消息的生产者,也是一个向交换机发布消息的客户端应用程序
- Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序
- Message:消息
- 消息是不具名的,它是由消息头和消息体组成
- 消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(消息可能需要持久型存储)[消息的路由模式]等
RabbitMQ的安装和使用
要安装RabbitMQ必须要先安装erlang语言环境。要注意匹配的版本:https://www.rabbitmq.com/which-erlang.html 本篇文件使用的是RabbitMQ的3.8.6的版本,需要对应erlang的语言环境匹配
erlang下载:https://dl.bintray.com/rabbitmq-erlang/rpm/erlang
socat下载:http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
RabbitMQ下载:https://www.rabbitmq.com/install-rpm.html#downloads
RabbitMQ安装启动
将软件包上传到Linux虚拟机的服务器。
1. 安装
rpm -ivh erlang-21.3.8.16-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.8.6-1.el7.noarch.rpm
2. 启动后台管理插件
rabbitmq-plugins enable rabbitmq_management
3. 启动RabbitMQ
systemctl start rabbitmq-server.service # 启动rabbitMQ
systemctl status rabbitmq-server.service # 查看rabbitMQ服务的状态
systemctl restart rabbitmq-server.service # 重启rabbitMQ服务
systemctl stop rabbitmq-server.service # 停止rabbitMQ服务的状态
查看RabbitMQ的启动端口信息:ps -ef | grep rabbitmq
4. 访问管理端的地址
http://172.16.150.130:15672/
如果启动正常会显示如下界面:默认账号和密码:guest
但是输入账号和密码会显示,不允许远程连接,那么就需要添加远程账户
添加账户:
[root@localhost opt]# rabbitmqctl add_user prim 123456 # 添加账户信息
Adding user "prim" ...
[root@localhost opt]# rabbitmqctl set_user_tags prim administrator # 设置账户标签为超级管理员
Setting tags for user "prim" to [administrator] ...
[root@localhost opt]# rabbitmqctl set_permissions -p "/" prim ".*" ".*" ".*" # 设置权限信息
Setting permissions for user "prim" in vhost "/" ...
查询用户列表:
[root@localhost opt]# rabbitmqctl list_users
Listing users ...
user tags
prim [administrator] # 这个就是我们添加的用户
guest [administrator]
用新创建的账户,进行登录,界面显示如下:
:::tips
注意:
5672:RabbitMQ的提供给编程语言客户端链接的端口;
15672:RabbitMQ管理界面的端口;
25672:RabbitMQ集群的端口
:::
RabbitMQ 快速入门
- 创建虚拟主机
创建maven工程,引入依赖
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.3</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.9</version> </dependency> </dependencies>
连接MQ ```java public class ConnectionUtils { public static Connection getConnection() throws Exception {
//1. 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 在工厂对象中设置MQ的连接信息 - ip post vhost username password factory.setHost("172.16.150.130"); factory.setPort(5672); factory.setVirtualHost("/edu");//在第一步创建的虚拟主机 factory.setUsername("prim"); //在安装时创建的账号和密码 factory.setPassword("123456"); //3. 通过工厂获得与MQ的连接 Connection connection = factory.newConnection(); return connection;
}
public static void main(String[] args) throws Exception {
Connection connection = getConnection(); System.out.println("connection:" + connection); //关闭链接 connection.close();
} }
输出如下信息,表明连接成功:<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/375694/1612247070041-34c5dd19-ede6-40de-9597-08b227b5d729.png#align=left&display=inline&height=90&margin=%5Bobject%20Object%5D&name=image.png&originHeight=180&originWidth=1082&size=21403&status=done&style=none&width=541)
<a name="4Jv5R"></a>
## RabbitMQ模式
RabbitMQ提供了6种消息模型,第6种RPC并不是MQ,只学习前5种<br />[https://www.rabbitmq.com/getstarted.html](https://www.rabbitmq.com/getstarted.html)<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/375694/1612247670132-ccefb687-24ed-42cb-a495-2f06f257d860.png#align=left&display=inline&height=558&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1116&originWidth=1768&size=201150&status=done&style=none&width=884)<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/375694/1612247695309-2d77925b-f825-4e2a-ab48-e29543cd11a9.png#align=left&display=inline&height=552&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1104&originWidth=1168&size=139816&status=done&style=none&width=584)<br />5种消息模型,大体分为两类:<br />1和2属于点对点<br />3.4.5属于发布订阅模式(一对多)
- **点对点模式**:P2P 模式包含三个角色:
- 消息队列queue,发送者sender,接受者receiver
- 每个消息发送到一个特定的队列中,接收者从中获得消息
- 队列中保留这些消息,直到他们被消费或超时
- 特点:
- 每个消息只有一个消费者,一旦消费,消息就不在队列中了
- 发送者和接收者之间没有依赖性,发送者发送完成,不管接受者是否运行,都不会影响消息发送到队列中(例如,QQ给你发送消息,不管你看不看手机,反正我发了)
- 接收者成功接收消息之后需向对象应答成功(确认)
- 如果希望发送的每个消息都会被成功处理,那需要P2P
这种模式就像,送快递,给你放到了快递柜中,不管你收件人在哪,只需要把快递放到快递柜中,会发短信通知你你的快递在快递柜中了。<br />这种模式的性能不太好,队列会被占用,就像快递柜放不了多少快递。
- **发布订阅模式**:publish/subscribe
- pub/sub模式包含三个角色:交换机(exchange)、发布者(publisher)、订阅者(subcriber)
- 多个发布者将消息发送交换机,系统将这些消息传递给多个订阅者
- 特点:
- 每个消息可以有多个订阅者
- 发布者和订阅者之间在时间上有依赖,对于某个交换机的订阅者,必须创建一个订阅后,才能消费发布者的消息
- 为了消费消息,订阅者必须保持运行状态:类似于,看电视、直播
- 如果希望发送的消息被多个消费者处理,可采用本模式
这种模式类似,关注了某个主播,当这个主播开播时,它就会通知订阅的所有人来观看直播
<a name="mRkWD"></a>
### 简单模式
> [https://www.rabbitmq.com/tutorials/tutorial-one-java.html](https://www.rabbitmq.com/tutorials/tutorial-one-java.html)
> RabbitMQ是一个消息代理,你可以把它想象成一个邮局,当你把你想要寄的邮件放到一个邮箱里,你可以确定邮递员先生最终会把邮件发送到你的收件人那里,在这个类比中,RabbitMQ是一个邮箱,一个邮局和一个邮递员
RabbitMQ本省只是接收,存储和转发消息,并不会对消息进行处理。处理信件的应该是收件人而不是邮局。<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/375694/1612249454486-faedf188-1fee-420f-83b5-1c6abb571675.png#align=left&display=inline&height=61&margin=%5Bobject%20Object%5D&name=image.png&originHeight=122&originWidth=858&size=26430&status=done&style=none&width=429)
直接看代码如何实现:
<a name="qmQ2Z"></a>
#### 消息生成者
```java
package simplest;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.ConnectionUtils;
/**
* @program: rabbitmq_quickstart
* @Description: 消息生产者
* @author: sufulu
* @version: 1.0.0
* @create: 2021-02-02 15:04
* @PackageName: simplest
* @ClassName: Sender.java
**/
public class Sender {
public static void main(String[] args) throws Exception {
String msg = "A:Hello Rabbit MQ";
//1. 获得连接
Connection connection = ConnectionUtils.getConnection();
//2. 在连接中创建通道
Channel channel = connection.createChannel();
//3. 创建消息队列
/**
* 参数1:队列名称
* 参数2:队列中的数据是否持久化
* 参数3:是否排外 是否支持扩展,当前队列只能自己用,不能给别人用
* 参数4:是否自动删除(当队列的连接数为0时,队列会销毁,不管队列中是否还保存数据)
* 参数5:队列参数,没有参数传null
*/
channel.queueDeclare("queue1", false, false, false, null);
//4. 向指定的队列发送消息
/**
* 参数1:交换机名称,当前是简单模式-点对点模式 没有交换机,所以名称为""
* 参数2:目标队列的名称:queue1
* 参数3:设置消息的属性,没有属性则为null
* 参数4:消息内容 直接接收byte[]
*/
channel.basicPublish("", "queue1", null, msg.getBytes());
System.out.println("已发送:" + msg);
//5. 释放资源
channel.close();
connection.close();
}
}
消息接收者
Recer 不会关闭,监听消息发送
package simplest;
import com.rabbitmq.client.*;
import utils.ConnectionUtils;
import java.io.IOException;
/**
* @program: rabbitmq_quickstart
* @Description: 消息接受者
* @author: sufulu
* @version: 1.0.0
* @create: 2021-02-02 15:16
* @PackageName: simplest
* @ClassName: Recer.java
**/
public class Recer {
public static void main(String[] args) throws Exception {
//1. 获得连接
Connection connection = ConnectionUtils.getConnection();
//2. 获得通道
Channel channel = connection.createChannel();
//3. 从信道中获得消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
java.lang.String s = new java.lang.String(body);
System.out.println("接收=" + s);
}
};
//4. 监听队列
//第二个参数:true 表示自动消息确认
channel.basicConsume("queue1",true, consumer);
}
}
启动消息生成者:
此时进入管理端查询消息队列信息
启动消息接收者,接收消息:可以接受到发送过来的消息
消息确认机制ACK
在上述的案例中,消息一旦被消费,消息就会立刻从队列中移除。
RabbitMQ如何得到消息被消费者接收?
- 如果消费者接收消息后,还没执行操作就抛出异常宕机导致消费失败,但是RabbitMQ无从得知,这样消息就丢失了
- 因此,RabbitMQ有一个ACK机制,当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收
- ACK:Acknowledge character 即是确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符,表示发来的数据已确认接收无误我们在使用HTTP请求时,HTTP的状态码200就是告诉我们服务器执行成功。
- 整个过程就像快递员将包裹送到你手里,并且需要你的签字并拍照回执
- 不过这种回执ACK分为两种情况:
- 自动ACK : 消息接收后,消费者立刻自动发送ACK(快递放在快递柜)
- 手动ACK :消息接收后,不会发送ACK,需要手动调用(快递必须本人签收)
- 这两种ACK的情况,需要根据消息的重要性选择
- 如果消息不太重要,自动ACK比较方便
- 如果消息非常重要,最好消费完成手动ACK,如果自动ACK消费后,RabbitMQ就会把消息从队列中删除,如果此时消费者抛出异常宕机,那么消息就永久丢失了
我们将Recer
中的代码:将第二个参数改为false,消息需要手动确认
channel.basicConsume("queue1",false, consumer);
发送消息,然后在启动Recer
接收者接收消息:
可以看到消息队列中显示Unacked = 1
表示有一条消息没有确认
如何确认消息呢?
//3. 从信道中获得消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("接收=" + s);
//表示:手动确认
//第二参数:是否同时确认多个消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
我们在重新发送消息,然后启动接收者,查看管理端的变化:可以看到消息队列中的消息都进行消费和ACK确认
工作队列模式(Works Queue)
在上述的简单模式中,一个消费者来处理消息,如果生产者生产消息过快过多,而消费者的能力有限,就会产生消息在队列中堆积(生活中的滞销)
工作队列模式:消息队列中的任务会被众多消费者共享,但其中某一个消息只会被一个消费者获取
消息消费者
假设两个消费者,来到了买肉串的地方,然后催促老板赶紧烤肉串(先启动两个消费者)
/**
* @program: rabbitmq_quickstart
* @Description: 消息接收者1,通过ACK确认机制
* @author: sufulu
* @version: 1.0.0
* @create: 2021-02-02 15:16
* @PackageName: simplest
* @ClassName: Recer.java
**/
public class Recer1 {
static int i = 1;//统计吃掉羊肉串的数量
public static void main(String[] args) throws Exception {
//1. 获得连接
Connection connection = ConnectionUtils.getConnection();
//2. 获得通道
Channel channel = connection.createChannel();
//queueDeclare 该方法有双重作用,如果队列不存在,则创建;如果队列存在,则获取
channel.queueDeclare("test_work_queue", false, false, false, null);
//3. 从信道中获得消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【顾客1】吃掉:" + s + " !总共吃【" + i++ + "】串!");
//模拟网络延迟 吃掉1串花费0.2s
try {
Thread.sleep(200);
} catch (Exception e) {
}
//表示:手动确认
//第二参数:是否同时确认多个消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//4. 监听队列
//第二个参数:true 表示自动消息确认;false 手动确认消息
channel.basicConsume("test_work_queue", false, consumer);
}
}
/**
* @program: rabbitmq_quickstart
* @Description: 消息接收者1,通过ACK确认机制
* @author: sufulu
* @version: 1.0.0
* @create: 2021-02-02 15:16
* @PackageName: simplest
* @ClassName: Recer.java
**/
public class Recer2 {
static int i = 1;//统计吃掉羊肉串的数量
public static void main(String[] args) throws Exception {
//1. 获得连接
Connection connection = ConnectionUtils.getConnection();
//2. 获得通道
Channel channel = connection.createChannel();
//queueDeclare 该方法有双重作用,如果队列不存在,则创建;如果队列存在,则获取
channel.queueDeclare("test_work_queue", false, false, false, null);
//3. 从信道中获得消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【顾客2】吃掉:" + s + " !总共吃【" + i++ + "】串!");
//模拟网络延迟 牙口不太好吃掉一串要花费 0.9s
try {
Thread.sleep(900);
} catch (Exception e) {
}
//表示:手动确认
//第二参数:是否同时确认多个消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//4. 监听队列
//第二个参数:true 表示自动消息确认;false 手动确认消息
channel.basicConsume("test_work_queue", false, consumer);
}
}
消息生产者
假设消息生产者,就是买肉串的老板,老板看到顾客来了,立马进行烤肉串,一次烤了100个肉串,给两个消费者
package work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.ConnectionUtils;
/**
* @program: rabbitmq_quickstart
* @Description: 消息生产者
* @author: sufulu
* @version: 1.0.0
* @create: 2021-02-02 16:05
* @PackageName: work
* @ClassName: Sender.java
**/
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("test_work_queue", false, false, false, null);
//生产100个肉串
for (int i = 1; i <= 100; i++) {
String msg = "羊肉串 --> " + i;
channel.basicPublish("", "test_work_queue", null, msg.getBytes());
System.out.println("新鲜出炉:" + msg);
}
//5. 释放资源
channel.close();
connection.close();
}
}
我们看一下结果:
顾客1(Recer1): 牙口好0.2s吃一个肉串。
顾客2(Recer2):上年纪了,牙口不好,需要0.9s吃一个肉串。
但是我们发现了一个问题,顾客1和顾客2 都吃了50串,顾客1先吃完了50串,然后顾客2慢慢吃完剩下的50串,顾客1就在那等着。
虽然两个消费者的消费速度不一致(线程休眠时间),但是消费的数量却是一致的,各消费50个消息。
例如:在工作中,A同学编码速率高,B同学编码速率低,两个人同时开发一个项目,A同学10天完成,B同学30天完成,A完成自己的编码部分,就无所事事了,等着B完成,这样不行的,遵循“能者多劳”
:::tips
效率高的多干点,效率低的少干点。
:::
如下图是由官网提供的解决思路:
可以使用设置为prefetchCount = 1的basicQos方法。这告诉 RabbitMQ一次不要给一个worker发送一条以上的消息。或者,换句话说,在worker处理并 确认前一个消息之前,不要向它发送新消息。相反,它将把它分派到下一个不繁忙的
worker。
即加上channel.basicQos(1)
快递一个一个送,送一个再送下一个,速度快的送件数多
能者多劳,必须要配合手动的ACK机制才可以生效。
面试题:避免消息堆积?
- workqueue,多个消费者监听同一个队列
- 接收消息后,通过线程池,异步消费
发布订阅模式
将一个消息传递给多个消费者。例如抖音的视频主,众多粉丝关注一个视频主,视频主发布视频,所有粉丝都可以收到视频通知
X就是视频主,红色的队列就是粉丝,binding是绑定的意思(关注)
P生产者发送信息给X路由,X将信息转发给绑定X的队列。
X队列将信息通过信道发送给消费者,从而进行消费。
整个过程,必须先创建路由:
- 路由在生产者程序中创建
- 因为路由没有存储消息的能力,当生产者将信息发送给路由后,消费者还没有运行,所以没有队列,路由并不知道将信息发送给谁
生产者
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// channel.queueDeclare("test_work_queue", false, false, false, null);
//声明路由,创建网红主播
//第一个参数:路由名称
//第二个参数:路由类型 一共有四种。
//fanout类型:不处理路由键(只需要将队列绑定到路由上,发送到路由的消息就都会被转发到与该路由绑定的所有队列上)
channel.exchangeDeclare("text_exchange_fanout", "fanout");
String msg = "hello everyone";
//向绑定路由、网红主播的人发送消息
channel.basicPublish("text_exchange_fanout", "", null, msg.getBytes());
System.out.println("生产者:" + msg);
//5. 释放资源
channel.close();
connection.close();
}
}
消费者
public class Recer1 {
public static void main(String[] args) throws Exception {
//1. 获得连接
Connection connection = ConnectionUtils.getConnection();
//2. 获得通道
Channel channel = connection.createChannel();
//queueDeclare 该方法有双重作用,如果队列不存在,则创建;如果队列存在,则获取
//声明队列
channel.queueDeclare("test_exchange_fanout_queue_1", false, false, false, null);
//关注网红,绑定路由
/**
* 参数1:队列名
* 参数2:路由名
*/
channel.queueBind("test_exchange_fanout_queue_1","text_exchange_fanout","");
//3. 从信道中获得消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者1】:" + s);
}
};
//4. 监听队列
//第二个参数:true 表示自动消息确认;false 手动确认消息
channel.basicConsume("test_exchange_fanout_queue_1", true, consumer);
}
}
消费者2和消费者1是一样的代码,此处省略。
运行生产者,先生产网红主播。
之后运行消费者,关注网红主播。
然后生产者,将消息给网红主播,网红主播进行消息上传,通知给关注主播的粉丝
路由模式
路由会根据类型进行
定向分发
消息给不同的队列。 可以理解为快递公司的分拣中心。整个小区,东面的楼小张送货,西面的楼小王送货。
运行顺序:
- 先运行一次生产者,创建路由
- 在运行消费者,绑定路由
-
生产者
```java public class Sender { public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel();
// channel.queueDeclare(“test_work_queue”, false, false, false, null);
//声明路由,创建网红主播 //第一个参数:路由名称 //第二个参数:路由类型 一共有四种。 //direct:根据路由键进行定向分发消息 channel.exchangeDeclare("text_exchange_direct", "direct"); String msg = "用户注册,【userid=s101】"; //推消息到路由器 //第二个参数必填,路由键 channel.basicPublish("text_exchange_direct", "select", null, msg.getBytes()); System.out.println("[用户系统]:" + msg); //5. 释放资源 channel.close(); connection.close();
} }
<a name="nUuyv"></a>
#### 消费者
```java
public class Recer1 {
public static void main(String[] args) throws Exception {
//1. 获得连接
Connection connection = ConnectionUtils.getConnection();
//2. 获得通道
Channel channel = connection.createChannel();
//queueDeclare 该方法有双重作用,如果队列不存在,则创建;如果队列存在,则获取
//声明队列
channel.queueDeclare("test_exchange_direct_queue_1", false, false, false, null);
//绑定路由 路由键的类型是:insert、update、delete就用queue1绑定
/**
* 参数1:队列名
* 参数2:路由名
* 参数3:路由键
*/
channel.queueBind("test_exchange_direct_queue_1","text_exchange_direct","insert");
channel.queueBind("test_exchange_direct_queue_1","text_exchange_direct","update");
channel.queueBind("test_exchange_direct_queue_1","text_exchange_direct","delete");
//3. 从信道中获得消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者1】:" + s);
}
};
//4. 监听队列
//第二个参数:true 表示自动消息确认;false 手动确认消息
channel.basicConsume("test_exchange_direct_queue_1", true, consumer);
}
}
消费者2:
public class Recer2 {
public static void main(String[] args) throws Exception {
//1. 获得连接
Connection connection = ConnectionUtils.getConnection();
//2. 获得通道
Channel channel = connection.createChannel();
//queueDeclare 该方法有双重作用,如果队列不存在,则创建;如果队列存在,则获取
//声明队列
channel.queueDeclare("test_exchange_direct_queue_2", false, false, false, null);
//绑定路由
/**
* 参数1:队列名
* 参数2:路由名
* 参数3:路由键
*/
channel.queueBind("test_exchange_direct_queue_2", "text_exchange_direct", "select");
//3. 从信道中获得消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者2】:" + s);
}
};
//4. 监听队列
//第二个参数:true 表示自动消息确认;false 手动确认消息
channel.basicConsume("test_exchange_direct_queue_2", true, consumer);
}
}
生产者,发送路由键为:insert、update、delete,则消费者1 接收消息
生产者,发送路由键为:select,则消费者2接收消息
通配符模式
通配符模式和路由模式90%是一样的,唯独的区别就是路由键支持模糊匹配。 匹配符号:
*
: 只能匹配一个词,正好一个词,多一个不行,少一个也不行#
:能匹配0个或更多个次
生产者
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// channel.queueDeclare("test_work_queue", false, false, false, null);
//声明路由,创建网红主播
//第一个参数:路由名称
//第二个参数:路由类型 一共有四种。
//direct:根据路由键进行定向分发消息
//topic:模糊匹配的定向分发
channel.exchangeDeclare("text_exchange_topic", "topic");
String msg = "订单下单";
//推消息到路由器
//第二个参数必填,路由键
channel.basicPublish("text_exchange_topic", "order.down", null, msg.getBytes());
System.out.println("[用户系统]:" + msg);
//5. 释放资源
channel.close();
connection.close();
}
}
消费者
消费者1:接收user.#
用户相关的消息
public class Recer1 {
public static void main(String[] args) throws Exception {
//1. 获得连接
Connection connection = ConnectionUtils.getConnection();
//2. 获得通道
Channel channel = connection.createChannel();
//queueDeclare 该方法有双重作用,如果队列不存在,则创建;如果队列存在,则获取
//声明队列
channel.queueDeclare("test_exchange_topic_queue_1", false, false, false, null);
/**
* 绑定用户相关的消息:user.#
* 参数1:队列名
* 参数2:路由名
* 参数3:路由键
*/
channel.queueBind("test_exchange_topic_queue_1","text_exchange_topic","user.#");
//3. 从信道中获得消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者1】:" + s);
}
};
//4. 监听队列
//第二个参数:true 表示自动消息确认;false 手动确认消息
channel.basicConsume("test_exchange_topic_queue_1", true, consumer);
}
}
消费者2,接收product.# 和 order.#
消息
public class Recer2 {
public static void main(String[] args) throws Exception {
//1. 获得连接
Connection connection = ConnectionUtils.getConnection();
//2. 获得通道
Channel channel = connection.createChannel();
//queueDeclare 该方法有双重作用,如果队列不存在,则创建;如果队列存在,则获取
//声明队列
channel.queueDeclare("test_exchange_topic_queue_2", false, false, false, null);
/**
* 绑定商品和订单相关的消息
* 参数1:队列名
* 参数2:路由名
* 参数3:路由键
*/
channel.queueBind("test_exchange_topic_queue_2", "text_exchange_topic", "product.#");
channel.queueBind("test_exchange_topic_queue_2", "text_exchange_topic", "order.#");
//3. 从信道中获得消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者2】:" + s);
}
};
//4. 监听队列
//第二个参数:true 表示自动消息确认;false 手动确认消息
channel.basicConsume("test_exchange_topic_queue_2", true, consumer);
}
}
运行测试:
:::tips
注意运行顺序:首先运行生产者,创建路由,然后在运行消费者,需要发送消息则继续运行生产者。
:::
分别发送:商品信息product.price
和订单信息order.down
再发送用户信息user.register
持久化
消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何避免消息丢失呢?
- 消费者的ACK确认机制,可以放置消费者丢失消息
- 万一在消费者消费之前,RabbitMQ服务器宕机,那消息也会丢失
将消息持久化,那么路由和队列都要持久化才可以。
在管理端,路由的Features代D
的就是持久化的路由,队列也是一样的Fetures为D
就表示持久化
为了演示测试效果,首先将RabbitMQ服务重启:systemctl restart rabbitmq-server.service
然后在管理端查看路由和队列已经没有了之前创建的路由和队列了,因为他们不是持久化的。
然后修改我们的上述写的通配符模式的代码:
生产者:路由持久化,exchangeDeclare
方法的第三个参数为true
,并且信道MessageProperties.PERSISTENT_TEXT_PLAIN
设置
channel.exchangeDeclare("text_exchange_topic", "topic", true);
String msg = "订单下单";
//推消息到路由器
//第二个参数必填,路由键
channel.basicPublish("text_exchange_topic", "order.down", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
消费者:队列持久化,第二个参数为true表示持久化队列
channel.queueDeclare("test_exchange_topic_queue_2", true, false, false, null);
运行,然后查看管理端:
路由信息:
队列信息:
然后我们在重新启动RabbitMQ服务,查看我们创建的路由和队列是否还存在,持久化的再次重新肯定还是存在的。