title: RabbitMQ
1.MQ引言
MessageQueue: 消息队列
模块之间的耦合度多高,导致一个模块宕机后,全部功能都不能用了,并且同步通讯的成本过高,用户体验差。
1.1什么是MQ
MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。一般用来解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。
1.2MQ有哪些
主要的MQ产品包括:RabbitMQ、ActiveMQ、RocketMQ、ZeroMQ、Kafka、IBM WebSphere 等。
市面上比较火爆的几款MQ:
ActiveMQ,RocketMQ,Kafka,RabbitMQ。
语言的支持:ActiveMQ,RocketMQ只支持Java语言,Kafka可以支持多们语言,RabbitMQ支持多种语言。
效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别,RabbitMQ是微秒级别的。
消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有比较成熟的解决方案。
学习成本:RabbitMQ非常简单。
RabbitMQ是由Rabbit公司去研发和维护的,最终是在Pivotal公司维护。
RabbitMQ严格的遵循AMQP协议,一种高级消息队列协议,帮助我们在进程之间传递异步消息。
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等
1.3不同MQ的特点
特性 | ActiveMq | RabbitMq | RocketMQ | Kafka |
---|---|---|---|---|
成熟度 | 成熟 | 成熟 | 比较成熟 | 成熟的日志领域 |
时效性 | 微秒级 | 毫秒级 | 毫秒级 | |
社区活跃度 | 低 | 高 | 高 | 高 |
单机吞吐量 | 万级,吞吐量比RocketMQ和Kafka要低了一个数量级 | 万级,吞吐量比RocketMQ和Kafka要低了一个数量级 | 10万级,RocketMQ也是可以支撑高吞吐的一种MQ | 10万级别,这是kafka最大的优点,就是吞吐量高。一般配合大数据类的系统来进行实时数据计算、日志采集等场景 |
topic数量对吞吐量的影响 | topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topic | topic从几十个到几百个的时候,吞吐量会大幅度下降所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源 | ||
可用性 | 高,基于主从架构实现高可用性 | 高,基于主从架构实现高可用性 | 非常高,分布式架构 | 非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
消息可靠性 | 有较低的概率丢失数据 | 经过参数优化配置,可以做到0丢失 | 经过参数优化配置,消息可以做到0丢失 | |
功能支持 | MQ领域的功能极其完备 | 基于erlang开发,所以并发能力很强,性能极其好,延时很低 | MQ功能较为完善,还是分布式的,扩展性好 | 功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准 |
优劣势总结 | 非常成熟,功能强大,在业内大量的公司以及项目中都有应用偶尔会有较低概率丢失消息而且现在社区以及国内应用都越来越少,官方社区现维护越来越少,几个月才发布一个版本而且确实主要是基于解耦和异步来用的,较少在大规模吞吐的场景中使用 | erlang语言开发,性能极其好,延时很低;吞吐量到万级,MQ功能比较完备而且开源提供的管理界面非常棒,用起来很好用社区相对比较活跃,几乎每个月都发布几个版本分在国内一些互联网公司近几年用rabbitmq也比较多一些但是问题也是显而易见的,RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。而且erlang开发,国内有几个公司有实力做erlang源码级别的研究和定制?如果说你没这个实力的话,确实偶尔会有一些问题,你很难去看懂源码,你公司对这个东西的掌控很弱,基本职能依赖于开源社区的快速维护和修复bug。而且rabbitmq集群动态扩展会很麻烦,不过这个我觉得还好。其实主要是erlang语言本身带来的问题。很难读源码,很难定制和掌控。 | 接口简单易用,而且毕竟在阿里大规模应用过,有阿里品牌保障日处理消息上百亿之多,可以做到大规模吞吐,性能也非常好,分布式扩展也很方便,社区维护还可以,可靠性和可用性都是ok的,还可以支撑大规模的topic数量,支持复杂MQ业务场景而且一个很大的优势在于,阿里出品都是java系的,我们可以自己阅读源码,定制自己公司的MQ,可以掌控社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准JMS规范走的有些系统要迁移需要修改大量代码还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险,那如果你们公司有技术实力我觉得用RocketMQ挺好的 | kafka的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展同时kafka最好是支撑较少的topic数量即可,保证其超高吞吐量而且kafka唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略这个特性天然适合大数据实时计算以及日志收集 |
1.4RabbitMQ介绍
RabbitMQ is the most widely deployed open source message broker.
为什么最受欢迎,应用最广泛?
1.使用AMQP协议 支持很多业务场景 比如 点对点 交换机路由 发布订阅模式 能适用很多业务场景
- 使用 erlang 语言 这个语言的特点 叫做面向并发编程 自身并发能力强 对socket 编程 支持友好
- 和spring 无缝整合
- 对数据一致性 数据丢失 错误处理 非常友好 可以不丢失任何数据 对错误数据恢复
rabbit 是部署最广泛的消息中间件
2.RabbitMQ安装
2.1下载
官网下载地址:https://www.rabbitmq.com/download.html
使用 docker-compose 安装 启动服务 进入服务内部 启动web 访问
cd /opt
mkdir docker_rabbitmq
cd docker_rabbitmq/
vim docker-compose.yml
# -d 后台作为守护进程启动
docker-compose up -d
docker-compose.yml 文件内容如下
version: "3.1"
services:
rabbitmq:
image: daocloud.io/library/rabbitmq:management
restart: always
container_name: rabbitmq
ports:
- 5672:5672 #rabbitmq 服务的端口号
- 15672:15672 # rabbitmq 图形化界面的端口号
volumes:
- ./data:/var/lib/rabbitmq
打开浏览器:http://192.168.174.128:15672 用户名 guest 密码 guest
2.2RabbitMQ架构【重点
】
2.2.1 官方的简单架构图
- Publisher - 生产者:发布消息到RabbitMQ中的Exchange
- Consumer - 消费者:监听RabbitMQ中的Queue中的消息
- Exchange - 交换机:和生产者建立连接并接收生产者的消息
- Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进行交互
- Routes - 路由:交换机以什么样的策略将消息发布到Queue
简单的架构图 |
---|
2.2.2 RabbitMQ的完整架构图
完整架构图 |
---|
查看图形化界面并创建一个Virtual Host
创建一个全新test用户,全新的Virtual Host,并且将test用户设置上可以操作/test的权限
3.RabbitMQ的使用【重点
】
3.1 RabbitMQ的通讯方式
通讯方式 |
---|
3.2java连接rabbitmq
3.2.1创建maven 项目
3.2.2导入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
3.2.3创建连接工具类
public class RabbitMQUtil {
public static Connection getConnection(){
// 创建连接mq 的连接工厂对象
ConnectionFactory factory = new ConnectionFactory();
// 设置 rabbitmq 的主机
factory.setHost("192.168.5.201");
// 设置端口号
factory.setPort(5672);
// 设置用户名
factory.setUsername("test");
// 设置密码
factory.setPassword("test");
// 设置连接哪个虚拟机
factory.setVirtualHost("/test");
// 创建Connection
Connection conn = null;
try {
conn = factory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
// 返回
return conn;
}
public static void main(String[] args) {
System.out.println(getConnection());
}
}
3.3 Hello-World
一个生产者,一个默认的交换机,一个队列,一个消费者
创建生产者,创建一个channel,发布消息到默认exchange,指定路由规则。
创建消费者,创建一个channel,创建一个队列,并且去消费当前队列
P:生产者 也就是要发送消息的程序
C:消费者 消息的接受者 会一直等待消息的到来
queue:消息队列 图中红色部分 类似一个邮箱 可以缓存消息 生产者向其中投递消息 消费者从其中取消息
package com.glls._1helloworld;
import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
public class HelloWorldTest {
@Test
public void publish() throws Exception {
//1. 获取Connection
Connection connection = RabbitMQUtil.getConnection();
//2. 创建Channel通道
Channel channel = connection.createChannel();
//3. 发布消息到exchange,同时指定路由的规则
String msg = "Hello-World!";
// 发布消息
// 参数1:指定exchange 交换机 ,当前模式 是 生产者 ---队列----消费者 模式
// 参数2:指定路由的规则,使用具体的队列名称 队列名称。
// 参数3:指定传递的消息所携带的properties,使用null。
// 比如:MessageProperties.PERSISTENT_BASIC 表示持久化消息
// 参数4:指定发布的具体消息,byte[]类型
//第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,
//因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话
//我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键
channel.basicPublish("","HelloWorld",null,msg.getBytes());
// Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
System.out.println("生产者发布消息成功!");
//4. 释放资源
channel.close();
connection.close();
}
@Test
public void consume() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQUtil.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3. 声明队列-HelloWorld
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外conn.close()-当前队列会被自动删除,当前队列只能被一个消费者消费
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("HelloWorld",true,false,false,null);
/**
* DefaultConsumer 是 Consumer 接口的实现类 接口中的定义的方法如下
这个不作为重点 了解即可
* handleCancel:除了调用basicCancel的其他原因导致消息被取消时调用。
* handleCancelOk:basicCancel调用导致的订阅取消时被调用。
* handleConsumeOk:任意basicComsume调用导致消费者被注册时调用。
* handleDelivery:消息接收时被调用。
* handleRecoverOk:basic.recover-ok被接收时调用
* handleShutdownSignal:当Channel与Conenction关闭的时候会调用。
*/
//4. 开启监听Queue
//简易版自定义 Consumer
//只需要重写DefaultConsumer 的handleDelivery方法即可取出消息,额外属性新增属性等操作
DefaultConsumer consume = new DefaultConsumer(channel){
// 重写DefaultConsumer 的handleDelivery 的方法 方法中获取消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body,"UTF-8"));
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK 开启自动确认机制 (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 消费回调接口
channel.basicConsume("HelloWorld",true,consume);
System.out.println("消费者开始监听队列!");
// 不要让消费者线程 结束 否则看不到监听效果了 如果没有这行代码 下面不要关闭通道和连接
System.in.read();
//5. 释放资源
channel.close(); // 不建议关闭 通道和连接
connection.close();
}
}
点对点的模型应用场景:
比如在注册的时候发送短信验证就可以以消息队列的形式 调用短信服务接口
再比如签到送积分的功能 可以用此模型 向积分服务 发送请求
3.3.1参数细节说明-durable
使用consume 这行代码来说明
# 参数1 队列名称
# 参数2 队列是否持久化 注意 是队列持久化 不是 队列中的数据持久化
# 当前设置为 false 启动消费者,在web页面会看到这个队列
channel.queueDeclare("HelloWorld",false,false,false,null);
启动消费者,web页面看到这个HelloWorld 队列
此时 停止消费者,这个记录依然存在, 停止docker中的rabbit ,再重启,这条记录就没有了,
# 参数2 是否持久化 设置为 true
channel.queueDeclare("HelloWorld",true,false,false,null);
此时 再停止 rabbit服务 重启rabbit 服务,这条记录依然存在,这就是 队列持久化,但是队列持久化 ,却不是队列中的数据持久化
要想让队列中的数据持久化,需要在生产者发布消息时 设置 队列中消息的属性
# 参数3 设置队列中消息的持久化
# 设置队列持久化 队列中消息持久化,rabbit 服务重启,队列 和 队列中的消息依然存在
channel.basicPublish("","HelloWorld",MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
3.3.2参数细节说明-exclusive
参数3 是否独占队列 当前队列 只允许当前连接可用 其他连接不能使用这个队列 true 独占队列 false 不独占 一般不独占
或者说 参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费) 一般不希望独占
#参数3 是否独占队列 一般不独占 设置为false ,也就是除了当前连接,其他消费者也可以 连接这个队列
channel.queueDeclare("HelloWorld",true,false,false,null);
#参数3 是否独占队列 如果设置为 true ,则当前队列 被当前连接独占 运行状态 如下
channel.queueDeclare("HelloWorld",true,true,false,null);
此时 如果停掉 consumer 这个队列 就会消失, 另外需要注意一点 ,生产者 此时 就不能声明队列了,因为 生产者再声明队列 就和消费者中的队列 出现排他性问题 报异常 ,只有把 生产者中 声明 队列 //channel.queueDeclare(“HelloWorld”,true,true,false,null); 这行代码 注释掉, 才能正常发送 消息。
3.3.3参数细节说明-autoDelete
如果这个队列没有消费者在消费,队列自动删除
参数4
# 消费者 这行代码 设置 第四个 参数 没有消费者的时候 是否自动删除 true 自动删除 false 不自动删除
channel.queueDeclare("HelloWorld",true,false,true,null);
此时 停止消费者线程 或者 关闭连接 ,这条记录会消失 ,即 删除了这个 队列
HelloWorld模型总结:生产者发送消息 到 消息队列 ,消费者监听消息队列 消息的变化,一旦有消息 ,消费者就去消费,这种模型比较简单,一个生产者 对应一个消费者,也是一些简单的业务用的比较多的场景,消息队列在中间 就类似一个邮箱 一个缓存,生产者把消息发送到消息队列,被消费者监听到 就去处理消息 依次来完成对应的业务操作。
这种模型是最简单的模型,可能应对不了某些特殊的场景。比如 消费者在处理某些消息时 因为业务逻辑的复杂 或者 消费者处理过慢,会造成消息队列中的消息 不断造成消息的堆积,所以 我们希望 生产者生产的消息 可以给更多的消费者消费 ,这样就提高了 消费者处理消息 的 效率 ,当然 我们要保证 不同的消费者处理的消息是不同的 不然会造成业务的重复处理, 这样 就可以 处理消息快一些 不堵塞消息队列。基于这种需求 就是下面的Work模型
3.4Work
一个生产者,一个默认的交换机,一个队列,两个消费者
work queues 也称为task queues ,任务模型 。
HelloWorld 模型的不足 :当消息处理比较耗时时,可能生产消息的速度会远远大于消息的消费速度,长此以往 消息堆积的越来越多,无法及时处理 此时 就可以考虑work 模型,让多个消费者绑定到同一个队列,共同消费队列中的消息。队列中的消息 一旦被消费 就会消失 因此 任务是不会被重复执行的。
只需要在消费者端,添加Qos能力以及更改为手动ack即可让消费者,根据自己的能力去消费指定的消息,而不是默认情况下由RabbitMQ平均分配了,生产者不变,正常发布消息到默认的exchange,并指定routing
P:生产者 消息的发送者
C1:消费者 领取任务 并完成任务
C2: 消费者2 领取任务并完成任务
queue: 红色部分 队列
Qos: (Quality of Service)QoS 是消息的发送⽅(Sender)和接受⽅(Receiver)之间达成的⼀个协议
ACK (Acknowledge character)即是确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符。表示发来的数据已确认接收无误。
消费者指定Qoa和手动ack
package com.glls._2work;
import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
/**
* Work 模型: 按劳分配 能者多劳
*
* 现在有两个消费者:消费者1处理消息处理的慢2秒一个
* 消费者2处理消息处理的快1秒一个 ,但是 在自动确认模式下
*
* channel.basicConsume("Work",true,consumer); Auto:ack true ,生产者发送了100条消息 会一下子全被 接受 在web 页面看不到被消费的过程
* 而且 消费者轮流依次消费 没有出现谁消费的快 就多消费一点的情况
* 如果改成 手动确认模式生产者发送的100 条消息 会逐渐被消费 在web页面能看到被消费的过程而且哪个消费者消费的快就会多消费
*
* 实际场景 我们希望 能者多劳 处理消息快的消费者 多处理一些
*
* 所以不建议使用消息的自动确认应该改为手动确认
*
*/
public class WorkTest {
@Test
public void publish() throws Exception {
//1. 获取Connection
Connection connection = RabbitMQUtil.getConnection();
//2. 创建Channel
Channel channel = connection.createChannel();
//3. 发布消息到exchange,同时指定路由的规则
String msg = "Hello-Work!";
// 参数1:指定exchange,使用""。
// 参数2:指定路由的规则,使用具体的队列名称。
// 参数3:指定传递的消息所携带的properties,使用null。
// 参数4:指定发布的具体消息,byte[]类型
for(int i=0;i<100;i++){
channel.basicPublish("","Work",null,(i+msg).getBytes());
}
// Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
System.out.println("生产者发布消息成功!");
//4. 释放资源
channel.close();
connection.close();
}
@Test
public void consume() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQUtil.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3. 声明队列-HelloWorld
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("Work",true,false,false,null);
//1 指定当前消费者,一次消费多少个消息,没有过来的消息,还在队列中保存,这样设置,不会造成消息丢失
//因为假如不指定一次消费一条消息就有可能有多条消息到达消费者此时消费者一旦宕机到达消费者的消息也就丢了
// 所以消息从队列到消费者一次来一条免得过来多条消息在半路丢了
channel.basicQos(1); // 不要一次性的把消息都给消费者容易丢失一次给一条安全
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者1号接收到消息:" + new String(body,"UTF-8"));
//2. 手动ack
// 参数1:long类型 标识队列中哪个具体的消息
// 参数2:boolean 类型 是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(),false);
// 处理完了消息 手动确认一下 队列再删除这个消息 这种机制保证消息永不丢失
// 队列给消费者 一条消息 消费者收到消息 处理完了之后 手动确认, 确认了之后 队列才把消息删除 保证消息永不丢失
// 而且 消费者 确认一个消息 队列发送一个消息 消费者确认的快 队列发送的快 能者多劳
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
//3. 指定手动ack
channel.basicConsume("Work",false,consumer);
System.out.println("消费者开始监听队列!");
// System.in.read();
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
@Test
public void consume2() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQUtil.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3. 声明队列-HelloWorld
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("Work",true,false,false,null);
//1 指定当前消费者,一次消费多少个消息
channel.basicQos(1);
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者2号接收到消息:" + new String(body,"UTF-8"));
//2. 手动ack
// 参数1:long类型,标识队列中哪个具体的消息
// 参数2:boolean 类型 是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
//3. 指定手动ack
channel.basicConsume("Work",false,consumer);
System.out.println("消费者开始监听队列!");
// System.in.read();
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
}
3.5 Publish/Subscribe
fanout 扇出 也称为广播
一个生产者,一个交换机,多个队列,多个消费者
可以有多个消费者 每个消费者都有自己的 queue
每个队列都要绑定到Exchange 交换机
生产者发送消息 只能发送到交换机 交换机决定要发给哪个队列 生产者无法决定
交换机把消息发送给绑定过得所有队列
队列的消费者都能拿到消息 实现一条消息被多个消费者消费
声明一个Fanout类型的exchange,并且将exchange和queue绑定在一起,绑定的方式就是直接绑定。
让生产者创建一个exchange并且指定类型,和一个或多个队列绑定到一起。
消费者还是正常的监听某一个队列即可。
使用场景:比如 注册成功 发送一个消息 短信服务 邮件服务 积分服务 这些服务 作为消费者
来消费接受这个消息 生产实践用的也较多
package com.glls._3pubsub;
import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
/**
* 发布订阅模型 广播模型 要使用交换机了
*
* fanout : 扇出 广播
*
* */
public class PubSubTest {
@Test
public void publish() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQUtil.getConnection();
//2. 创建通道对象
Channel channel = connection.createChannel();
//3. 发布消息到exchange,同时指定路由的规则
String msg = "Hello-PubSub!";
// Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
//3. 创建交换机 - 绑定某一个队列
//参数1: exchange 交换机的名称
//参数2: 指定exchange 交换机的类型
// FANOUT - pubsub 广播类型 , DIRECT - Routing , TOPIC - Topics
channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
/**
*
* 广播模式下路由key 是没有用的 routingKey 没有意义 所以空着不写
* */
//第一次 发布消息 到 交换机
channel.basicPublish("pubsub-exchange","",null,msg.getBytes());
//第二次 发布消息 到 交换机
channel.basicPublish("pubsub-exchange","",null,msg.getBytes());
System.out.println("生产者发布消息成功!");
//4. 释放资源
channel.close();
connection.close();
}
@Test
public void consume() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQUtil.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3. 声明交换机
channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
// 创建临时队列
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queueName,"pubsub-exchange","");
//1 指定当前消费者,一次消费多少个消息
channel.basicQos(1);
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者1号接收到消息:" + new String(body,"UTF-8"));
//2. 手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
//3. 指定手动ack
channel.basicConsume(queueName,false,consumer);
System.out.println("消费者开始监听队列!");
// System.in.read();
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
@Test
public void consume2() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQUtil.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3. 声明 交换机
channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
// 创建临时队列
String queueName = channel.queueDeclare().getQueue();
// 绑定 交换机 和 队列
channel.queueBind(queueName,"pubsub-exchange","");
//1 指定当前消费者,一次消费多少个消息
channel.basicQos(1);
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者2号接收到消息:" + new String(body,"UTF-8"));
//2. 手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
//3. 指定手动ack
channel.basicConsume(queueName,false,consumer);
System.out.println("消费者开始监听队列!");
// System.in.read();
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
}
3.6 Routing
Routing之订阅模型-Direct
在Fanout模式中,一条消息 会被所有订阅的队列都消费。但是在某些场景下 我们希望不同的消息被不同的队列消费。这时 就要用到Direct 类型的Exchange
在Direct模型下:
1.队列和交换机的绑定 不能是任意绑定了 ,而是要指定一个RoutingKey (路由key)
2.消息的发送方在向Exchange 发送消息时 ,也必须指定消息的RoutingKey
3.Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key 进行判断,只有队列的RoutingKey 与消息的RoutingKey 完全一致 ,才会接收到消息
P:生产者 向Exchange 发送消息 发送消息时 会指定一个 RoutingKey
X:Exchange (交换机) 接受生产者的消息 然后把消息传递给 与RoutingKey 完全匹配的队列
C1 消费者 其所在队列指定了需要RoutingKey 为error 的消息
C2 消费者 其所在队列 指定了需要RoutingKey 为 info error 的消息
生产者在创建DIRECT类型的exchange后,根据RoutingKey去绑定相应的队列,并且在发送消息时,指定消息的具体RoutingKey即可。
消费者没有变化
package com.glls._4routing;
import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
/**
* Exchange --- direct
*
* */
public class RoutingTest {
@Test
public void publish() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQUtil.getConnection();
//2. 创建连接通道对象
Channel channel = connection.createChannel();
// Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
//3. 创建exchange - 绑定某一个队列
//参数1: 交换机的名称
//参数2: 路由模式 指定exchange的类型 FANOUT - pubsub , DIRECT - Routing , TOPIC - Topics
channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);
//4. 发布消息到exchange,同时指定路由的规则
channel.basicPublish("routing-exchange","ERROR",null,"ERROR".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO1".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO2".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO3".getBytes());
System.out.println("生产者发布消息成功!");
//5. 释放资源
channel.close();
connection.close();
}
@Test
public void consume() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQUtil.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3. 声明 交换机
channel.exchangeDeclare("routing-exchange",BuiltinExchangeType.DIRECT);
//创建临时 队列
String queueName = channel.queueDeclare().getQueue();
//基于 路由key 绑定 队列 和 交换机
channel.queueBind(queueName,"routing-exchange","ERROR");
//1 指定当前消费者,一次消费多少个消息
channel.basicQos(1);
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者1号接收到消息:" + new String(body,"UTF-8"));
//2. 手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
//3. 指定手动ack
channel.basicConsume(queueName,false,consumer);
System.out.println("消费者开始监听队列!");
// System.in.read();
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
@Test
public void consume2() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQUtil.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3.声明交换机
channel.exchangeDeclare("routing-exchange",BuiltinExchangeType.DIRECT);
//创建临时队列
String queueName = channel.queueDeclare().getQueue();
//基于 路由key 绑定 队列 和 交换机
channel.queueBind(queueName,"routing-exchange","ERROR"); //绑定路由Key 为ERROR 的
channel.queueBind(queueName,"routing-exchange","INFO"); // 绑定路由Key 为INFO 的
//1 指定当前消费者,一次消费多少个消息
channel.basicQos(1);
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者2号接收到消息:" + new String(body,"UTF-8"));
//2. 手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
//3. 指定手动ack
channel.basicConsume(queueName,false,consumer);
System.out.println("消费者开始监听队列!");
// System.in.read();
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
}
3.7 Topic
Routing之订阅模型-Topic
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey 把消息路由到不同的队列,只不过Topic 类型的Exchange可以让队列在绑定RoutingKey 的时候使用通配符!这种模型RoutingKey 一般都是由一个或多个单词组成,多个单词之间以 “.” 分割, 例如 item.insert
#通配符
* (star) can substitute for exactly one word . 匹配不多不少恰好一个词
# (hash) can substitute for zero or more words . 匹配一个或多个词
# 如:
audit.# 匹配 audit.irs.corporate 或者 audit.irs 等
audit.* 只能匹配 audit.irs
生产者创建Topic的exchange并且绑定到队列中,这次绑定可以通过和#关键字,对指定RoutingKey内容,编写时注意格式 xxx.xxx.xxx去编写, -> 一个xxx,而# -> 代表多个xxx.xxx,在发送消息时,指定具体的RoutingKey到底是什么。
消费者只是监听队列,没变化。
package com.glls._5topic;
import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
public class TopicTest {
@Test
public void publish() throws Exception {
//1. 获取Connection
Connection connection = RabbitMQUtil.getConnection();
//2. 创建Channel
Channel channel = connection.createChannel();
// Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
//3. 创建exchange - 绑定某一个队列
//参数1: exchange的名称
//参数2: 指定exchange的类型 FANOUT - pubsub , DIRECT - Routing , TOPIC - Topics
channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
//绑定多个队列 在消费者中 指定 临时队列
//channel.queueBind("topic-queue-1","topic-exchange","*.red.*");
//channel.queueBind("topic-queue-2","topic-exchange","fast.#");
//channel.queueBind("topic-queue-2","topic-exchange","*.*.rabbit");
//3. 发布消息到exchange,同时指定路由的规则
channel.basicPublish("topic-exchange","fast.red.monkey",null,"红快猴子".getBytes());
channel.basicPublish("topic-exchange","slow.black.dog",null,"黑漫狗".getBytes());
channel.basicPublish("topic-exchange","fast.white.cat",null,"快白猫".getBytes());
System.out.println("生产者发布消息成功!");
//4. 释放资源
channel.close();
connection.close();
}
@Test
public void consume() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQUtil.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3.声明交换机
channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
//4.创建临时队列
String queueName = channel.queueDeclare().getQueue();
//5.绑定队列和交换机
channel.queueBind(queueName,"topic-exchange","*.red.*");
//6 指定当前消费者,一次消费多少个消息
channel.basicQos(1);
//7. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者1号接收到消息:" + new String(body,"UTF-8"));
//2. 手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
//3. 指定手动ack
channel.basicConsume(queueName,false,consumer);
System.out.println("消费者开始监听队列!");
// System.in.read();
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
@Test
public void consume2() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQUtil.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3. 声明交换机
channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
//4.创建临时队列
String queueName = channel.queueDeclare().getQueue();
//5.绑定队列和交换机
channel.queueBind(queueName,"topic-exchange","*.*.rabbit");
channel.queueBind(queueName,"topic-exchange","fast.#");
//6 指定当前消费者,一次消费多少个消息
channel.basicQos(1);
//7. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者2号接收到消息:" + new String(body,"UTF-8"));
//2. 手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
//3. 指定手动ack
channel.basicConsume(queueName,false,consumer);
System.out.println("消费者开始监听队列!");
// System.in.read();
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
}
4.SpringBoot整合RabbitMQ【重点
】
4.1 SpringBoot整合RabbitMQ
4.1.1 创建SpringBoot工程
4.1.2 导入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
4.1.3 编写配置文件
spring:
rabbitmq:
host: 192.168.5.201
port: 5672
username: test
password: test
virtual-host: /test
4.1.4 Hello-World
第一种使用配置类定义队列的方式
//===========配置类===========
@Configuration
public class RabbitMQConfig {
@Bean
Queue simpleQueue(){
return new Queue("simpleQueue");
}
}
//=============消费者=============
@Component
public class HelloConsume {
// 在配置类中创建queue在这里引用即可从simpleQueue队列中消费消息
// @RabbitListener也可以在类上使用
@RabbitListener(queues = "simpleQueue")
public void receive(String msg){
System.out.println("消费者接收到的消息是:" + msg);
}
}
//==========生产者========
@SpringBootTest
class SpringbootMqApplicationTests {
// 注入rabbitTemplate
@Autowired
RabbitTemplate rabbitTemplate;
// 生产者到simpleQueue对列发布消息
@Test
void contextLoads() {
rabbitTemplate.convertAndSend("simpleQueue","SpringBoot整合MQ发送的消息");
}
}
第二种使用注解的方式定义队列
//=========消费者==========
@Component
public class HelloConsume {
// 使用queuesToDeclare声明队列并从这个队列中消费消息
@RabbitListener(queuesToDeclare = @Queue(name = "simpleQueue"))
public void receive(String msg){
System.out.println("消费者接收到的消息是:" + msg);
}
}
//===========生产者============
@SpringBootTest
class SpringbootMqApplicationTests {
// 注入rabbitTemplate
@Autowired
RabbitTemplate rabbitTemplate;
// 生产者到simpleQueue对列发布消息
@Test
void contextLoads() {
rabbitTemplate.convertAndSend("simpleQueue","SpringBoot整合MQ发送的消息");
}
}
4.1.5Work
// 生产者
@Test
public void testWork(){
// Work 模型
for(int i=0;i<20;i++){
rabbitTemplate.convertAndSend("work","work模型: "+i);
}
}
-----------------------------------------------------------------------------
@Component
public class WorkConsumer {
@RabbitListener(queuesToDeclare = @Queue(name = "work",durable = "false"))
public void getMessage(Object message){
System.out.println("接收到消息1:" + message);
}
@RabbitListener(queuesToDeclare = @Queue(name = "work",durable = "false"))
public void getMessage2(Object message){
System.out.println("接收到消息2:" + message);
}
}
4.1.6Pub/Sub
// 生产者
@Test
public void testFanout(){
// 生产发布模型 广播模型
rabbitTemplate.convertAndSend("boot-pubsub-exchange","","广播模式");
}
-----------------------------------------------------------
// 消费者
@Component
public class PubSubConsumer {
@RabbitListener(bindings = {
@QueueBinding(value = @Queue, // 创建临时队列
exchange = @Exchange(value = "boot-pubsub-exchange",type ="fanout")) // 绑定的交换机
})
public void getMessage(Object message){
System.out.println("消费者1:"+message);
}
@RabbitListener(bindings = {
@QueueBinding(value = @Queue, // 创建临时队列
exchange = @Exchange(value = "boot-pubsub-exchange",type ="fanout")) // 绑定的交换机
})
public void getMessage2(Object message){
System.out.println("消费者2:"+message);
}
}
4.1.7route
// 生产者
@Test
public void testRouting(){
// 路由模式
rabbitTemplate.convertAndSend("boot-route-exchange","info","发送的是info的key的路由信息");
}
// ------------------------------------------------------
// 消费者
@Component
public class RouteConsumer {
@RabbitListener(bindings = {
@QueueBinding(value = @Queue, // 创建临时队列
exchange = @Exchange(value = "boot-route-exchange",type = "direct"),
key = {"info","error"})})
public void getMessage1(Object message){
System.out.println("消费者1:"+message);
}
@RabbitListener(bindings = {
@QueueBinding(value = @Queue, // 创建临时队列
exchange = @Exchange(value = "boot-route-exchange",type = "direct"),key = {"info"})})
public void getMessage2(Object message){
System.out.println("消费者2:"+message);
}
}
4.1.8topic
// 生产者
@Test
void testTopic2(){
//rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!");
rabbitTemplate.convertAndSend("boot-topic-exchange","black.dog.and.cat","黑色狗和猫!!");
}
//消费者
package com.glls.bootrabbitmqdemo1._5topic;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicConsumer {
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "boot-topic-exchange",type = "topic"),
key = {"*.red.*","black.*.#"}
)
}
)
public void getMessage1(Object message){
System.out.println("接收到消息1:" + message);
}
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "boot-topic-exchange",type = "topic"),
key = {"black.*.#"}
)
}
)
public void getMessage2(Object message){
System.out.println("接收到消息2:" + message);
}
}
4.2 手动ack
要在消息消费完之后才告诉 rabbitmq 这个消息消费了,而不是还没消费就确认。
避免消息消费失败了但是消息已经被自动确认了 那么这个消息就相当于丢了 即丢消息
实现步骤:
1.在yml 配置文件 指定 手动 配置
spring:
rabbitmq:
host: 192.168.5.201
port: 5672
username: test
password: test
virtual-host: /test
listener:
simple:
acknowledge-mode: manual # 手动指定 ack
- 在 消费者的方法参数中 指定参数
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "boot-topic-exchange",type = "topic"),
key = {"black.*.#"}
)
}
)
public void getMessage3(String msg, Channel channel, Message message) throws IOException {
System.out.println("接收到消息3:" + msg);
// 手动 ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
5.RabbitMQ的其他操作
RabbitMQ的消息确认机制 保证了 消息的可靠性传递
RabbitMQ的消息确认有两种。
一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。 确认是否到达交换机 使用 confirm 机制 确认是否到达 queue 使用 return 机制
第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。 即消费者的 ack
消息确认的作用是什么?
为了防止消息丢失。消息丢失分为发送丢失和消费者处理丢失,相应的也有两种确认机制。
5.1消息的可靠性
RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100倍。
RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多。
5.1.1普通Confirm方式
package com.glls._6confirm;
import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
*
* 普通 confirm
* 普通Confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端Confirm。
* 实际上是一种串行Confirm了,每publish一条消息之后就等待服务端Confirm,如果服务端返回false或者超时时间内未返回,
* 客户端进行消息重传;
* */
public class HelloWorldTest {
private final static String QUEUE_NAME = "confirm";
@Test
public void publish() throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 开启 confirm
channel.confirmSelect();
String msg = "This is a confirm message!";
channel.queueDeclare(QUEUE_NAME,true,false,true,null);
for (int i = 0; i < 500; i++) {
channel.basicPublish("",QUEUE_NAME,null,("普通Confirm模式, 第" + (i + 1) + "条消息").getBytes());
// 判断消息是否发送成功
if(channel.waitForConfirms()){
// System.out.println("消息发送成功");
}else{
System.out.println("消息发送失败");
// 可以在这里进行重发操作
}
}
//4. 关闭频道和连接
channel.close();
connection.close();
}
// 消费者不变
@Test
public void consume() throws Exception {
Connection connection = RabbitMQUtil.getConnection();
System.out.println(connection);
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,true,null);
DefaultConsumer consume = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body,"UTF-8"));
}
};
channel.basicConsume(QUEUE_NAME,true,consume);
System.out.println("消费者开始监听队列!");
System.in.read();
channel.close();
connection.close();
}
}
5.1.2 批量Confirm方式。
package com.glls._6confirm;
import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
/**
*
* 批量Confirm方式
* 批量Confirm模式:批量Confirm模式,每发送一批消息之后,调用waitForConfirms()方法,等待服务端Confirm,
* 这种批量确认的模式极大的提高了Confirm效率,但是如果一旦出现Confirm返回false或者超时的情况,
* 客户端需要将这一批次的消息全部重发,这会带来明显的重复消息,如果这种情况频繁发生的话,效率也会不升反降;
* */
public class HelloWorldTest2 {
private final static String QUEUE_NAME = "confirm many";
@Test
public void publish() throws Exception {
Connection connection = RabbitMQUtil.getConnection();
System.out.println(connection);
Channel channel = connection.createChannel();
// 开启 confirm
channel.confirmSelect();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//这里主要更改代码为发送批量消息后 再进行等待服务器确认 把确认消息 放在 for 循环 外面
for (int i = 0; i < 500; i++) {
String msg = "批量确认!" + i;
channel.basicPublish("",QUEUE_NAME,null,(" 批量Confirm模式, 第" + (i + 1) + "条消息").getBytes());
}
//3.3 确定批量操作是否成功
// 该方法会等到最后一条消息得到确认或者得到nack才会结束
// 也就是说在waitForConfirmsOrDie处会造成当前程序的阻塞
// 当你发送的全部消息,有一个失败的时候,就直接全部失败 抛出异常IOException
channel.waitForConfirmsOrDie();
//4. 释放资源
channel.close();
connection.close();
}
@Test
public void consume() throws Exception {
Connection connection = RabbitMQUtil.getConnection();
System.out.println(connection);
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
DefaultConsumer consume = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body,"UTF-8"));
}
};
channel.basicConsume(QUEUE_NAME,true,consume);
System.out.println("消费者开始监听队列!");
System.in.read();
channel.close();
connection.close();
}
}
5.1.3 异步Confirm方式。
package com.glls._6confirm;
import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
/**
*
* 异步Confirm方式。
* 异步Confirm模式:提供一个回调方法,服务端Confirm了一条或者多条消息后Client端会回调这个方法。
*
* */
public class HelloWorldTest3s {
private final static String QUEUE_NAME = "asynch confirm ";
@Test
public void publish() throws Exception {
Connection connection = RabbitMQUtil.getConnection();
System.out.println(connection);
Channel channel = connection.createChannel();
// 开启 confirm
channel.confirmSelect();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
long start = System.currentTimeMillis();
//3.3 开启异步回调
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// deliveryTag:是该消息的index
System.out.println("消息发送成功,标识:" + deliveryTag + ",是否是批量" + multiple);
// multiple的值为true,true确认所有将比第一个参数指定的 delivery-tag 小的消息都得到确认
}
//handleNack 3s 10s xxx.. 重试
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送失败,标识:" + deliveryTag + ",是否是批量" + multiple);
}
});
System.out.println("执行异步确认耗费时间: "+(System.currentTimeMillis()-start)+"ms");
//批量发送消息
for (int i = 0; i < 500; i++) {
channel.basicPublish("",QUEUE_NAME,null,(" 异步Confirm模式, 第" + (i + 1) + "条消息").getBytes());
}
/**
* 可以看到,虽然我们还是发送了500条消息,同样我们并没有收到500个ack消息 ,只收到较少的ack消息,
* 并且这些个ack消息的multiple域都为true,你多次运行程序会发现每次发送回来的ack消息中的deliveryTag域的值并不是一样的,
* 说明broker端批量回传给发送者的ack消息并不是以固定的批量大小回传的;
*
* */
// 卡住程序 不结束 就可以看到 异步监听的效果了
System.in.read();
//4. 释放资源
channel.close();
connection.close();
}
@Test
public void consume() throws Exception {
Connection connection = RabbitMQUtil.getConnection();
System.out.println(connection);
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
DefaultConsumer consume = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("接收到消息:" + msg);
}
};
channel.basicConsume(QUEUE_NAME,true,consume);
System.out.println("消费者开始监听队列!");
System.in.read();
channel.close();
connection.close();
}
}
5.1.4 Return机制
Confirm只能监听到消息是否到达exchange,无法保证消息可以被exchange分发到指定queue。
而且exchange是不能持久化消息的,queue是可以持久化消息。
采用Return机制来监听消息是否从exchange送到了指定的queue中
开启Return机制,并在发送消息时,指定mandatory为true
package com.glls._7return;
import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
/**
*
* return 机制
* confirm 机制 只能确认 消息 发送到了 exchange 不能保证到 消息队列了
* 所以 开启 return 机制 监听 是否消息 到了 queue 中
*
* */
public class HelloWorldTest4 {
private final static String QUEUE_NAME = "unsynch confirm and return";
@Test
public void publish() throws Exception {
Connection connection = RabbitMQUtil.getConnection();
System.out.println(connection);
Channel channel = connection.createChannel();
// 开启 confirm
channel.confirmSelect();
channel.queueDeclare(QUEUE_NAME,true,false,true,null);
//3.3 开启异步回调
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送成功,标识:" + deliveryTag + ",是否是批量" + multiple);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送失败,标识:" + deliveryTag + ",是否是批量" + multiple);
}
});
// 1. 开启return机制 2. 发送消息时 指定 mandatory 参数
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 当消息没有送达到queue时,才会执行。
System.out.println(new String(body,"UTF-8") + "没有送达到Queue中!!");
}
});
//批量发送消息
for (int i = 0; i < 500; i++) {
// 2. 发送消息时 指定 mandatory为true 第三个参数
channel.basicPublish("",QUEUE_NAME,true,null,(" 异步Confirm和return, 第" + (i + 1) + "条消息").getBytes());
}
System.in.read(); // 卡住程序 不结束 就可以看到 异步监听的效果了
//4. 释放资源
channel.close();
connection.close();
}
@Test
public void consume() throws Exception {
Connection connection = RabbitMQUtil.getConnection();
System.out.println(connection);
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,true,null);
DefaultConsumer consume = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("接收到消息:" + msg);
}
};
channel.basicConsume(QUEUE_NAME,true,consume);
System.out.println("消费者开始监听队列!");
System.in.read();
channel.close();
connection.close();
}
}
5.2 springboot 实现rabbitmq消息的可靠性
5.2.1配置文件
编写配置文件 开启Confirm 和 Return 机制
spring:
rabbitmq:
host: 192.168.5.201
port: 5672
username: test
password: test
virtual-host: /test
listener:
simple:
acknowledge-mode: manual # 手动指定 ack
publisher-confirm-type: simple # 确认
publisher-returns: true # 消息可靠性
指定RabbitTemplate 对象 开启 Confirm 和 Return 并编写回调方法
生产者 消费者 没有什么变化
package com.qf.springbootmq.config;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @author zed
* @date 2022/6/8 6:46
*/
@Component
public class MqConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("消息唯一标识"+correlationData);
System.out.println("确认结果"+ack);
System.out.println("失败原因"+cause);
if(ack){
System.out.println("消息已经送到了exchange");
}else{
System.out.println("消息没有送到exchange");
}
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
// 如果执行了这个方法 说明消息没有送到 queue 中
System.out.println("消息没有送到 queue ");
}
}
5.3避免消息的重复消费
重复消费消息,会对非幂等行操作造成问题
重复消费消息的原因是,消费者没有给RabbitMQ一个ack
为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中,
id-0(正在执行业务)
id-1(执行业务成功)
如果ack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。
极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。
5.4避免消息的重复消费-springboot 实现
5.4.1 导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
5.4.2 编写配置文件
spring:
redis:
host: 192.168.199.109
port: 6379
5.4.3 修改生产者
@Test
void testRepeat(){
rabbitTemplate.convertAndSend("boot-topic-exchange-repeat","black.dog.and.cat","黑色狗和猫!!",new CorrelationData());
//System.in.read();
}
5.4.4 修改消费者
package com.glls.bootrabbitmqdemo1._8repeat;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
@Component
public class TopicRepeatConsumer {
@Autowired
private StringRedisTemplate redisTemplate;
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "boot-topic-exchange-repeat",type = "topic"),
key = {"black.*.#"}
)
}
)
public void getMessage4(String msg, Channel channel, Message message) throws IOException {
//0. 获取MessageId
String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
//1. 设置key到Redis
if(redisTemplate.opsForValue().setIfAbsent(messageId,"0",10, TimeUnit.SECONDS)) {
//2. 消费消息
System.out.println("接收到消息:" + msg);
//3. 设置key的value为1
redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
//4. 手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else {
//5. 获取Redis中的value即可 如果是1,手动ack
if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
}
}
5.5 高级特性
5.5.1 TTL
Time to live 过期时间 设置消息的过期时间 有两种方式
- 指定一条消息的过期时间
- 给队列设置消息过期时间,队列中所有的消息都有同样的过期时间
应用场景:比如 下单未支付 则订单自动删除的实现
5.5.1.1 设置消息的过期时间
发送一条设置了过期时间的消息
细节: 过期时间 指的是 消息在 队列中的存活时间,所以 此时 为了看到效果 不用设置消费者监听队列 一直消费消息,如果 一直监听队列 消费消息的话 就看不到消息过期之后 消息从队列中消失的的效果了
所以 创建 交换机 创建 队列 别创建消费者
创建一个工具类 封装 交换机名字 队列名字 等 这些常量值
public class RabbitMQCommonConfig {
//ttl-direct-exchange 交换机
public static final String TTL_DIRECT_EXCHANGE = "ttldirectExchange";
//ttl-direct-queue 队列
public static final String TTL_DIRECT_QUEUE = "ttldirectQueue";
//ttl-direct-routingkey 路由key
public static final String TTL_DIRECT_ROUTINGKEY = "ttl_direct_routingkey";
}
在消费者端 创建 交换机 队列 Binging 这些bean ,注意 不创建消费者 是为了看到消息的过期时间属性效果
@Configuration
public class TTLConfig {
@Bean
public DirectExchange ttlDirectExchange(){
return new DirectExchange(RabbitMQCommonConfig.TTL_DIRECT_EXCHANGE);
}
@Bean
public Queue ttlDirectQueue(){
return new Queue(RabbitMQCommonConfig.TTL_DIRECT_QUEUE);
}
@Bean
public Binding ttlDirectBinding(){
return BindingBuilder.bind(ttlDirectQueue()).to(ttlDirectExchange()).with(RabbitMQCommonConfig.TTL_DIRECT_ROUTINGKEY);
}
}
在生产者端 发送一条 设置了 过期时间的消息
// 发送 ttl 消息
// 第一种方式 设置消息的 属性
@Test
public void sendTTLMessage(){
String msg = "测试设置了过期时间的消息"+new Date().toLocaleString();
MessageProperties messageProperties=new MessageProperties(); // 消息属性对象
messageProperties.setMessageId(UUID.randomUUID().toString());
messageProperties.setExpiration("10000"); // 设置消息的过期时间为10秒
Message message = new Message(msg.getBytes(),messageProperties);
rabbitTemplate.send(RabbitMQCommonConfig.TTL_DIRECT_EXCHANGE,RabbitMQCommonConfig.TTL_DIRECT_ROUTINGKEY,message);
}
测试 发现 消息在队列 10 秒后 消失
5.5.1.2 设置队列的过期时间
直接在队列上设置消息的过期时间 这样 队列中的消息过期时间也都跟 队列设置的过期时间相同,
如果 消息也设置了过期时间 谁小谁优先级高
@Bean
public Queue ttlDirectQueue(){
// 在队列上 设置 此队列中 消息的过期时间
Map<String,Object> map=new HashMap<>();
// 队列中 所有的消息的过期时间 为 20秒
//map.put("x-message-ttl",20000);
// 队列中 所有的消息的过期时间 为 5秒
map.put("x-message-ttl",5000);
//return new Queue(TTL_DIRECT_QUEUE,true,false,false);
return new Queue(TTL_DIRECT_QUEUE,true,false,false,map);
}
说明: 如果同时指定了Message的TTL 和 Queue 的 TTL ,则优先较小的那一个
所以 最佳实践 是 在 队列上设置过期时间
注意点:
TTL的延时队列存在一个问题,就是同一个队列里的消息延时时间最好一致,比如说队列里的延时时间都是1小时,千万不能队列里的消息延时时间乱七八糟多久的都有,这样的话先入队的消息如果延时时间过长会堵着后入队延时时间小的消息,导致后面的消息到时也无法变成死信转发出去,很坑!!!
举个栗子:延时队列里先后进入A,B,C三条消息,存活时间是3h,2h,1h,结果到了1小时C不会死,到了2hB不会死,到了3小时A死了,同时B,C也死了,意味着3h后A,B,C才能消费,很坑!!!
7.2 死信队列
队列中的消息可能会变成死信消息(dead-lettered),进而当以下几个事件任意一个发生时,消息将会被重新发送到一个交换机:,
1,消息被消费者使用basic.reject或basic.nack方法并且requeue参数值设置为false的方式进行消息确认(negatively acknowledged)
2,消息由于消息有效期(per-message TTL)过期
3,消息由于队列超过其长度限制而被丢弃
注意,队列的有效期并不会导致其中的消息过期
7.3 延迟队列
什么是延迟队列?
延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。
其次,延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。
简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
延迟队列使用场景:
那么什么时候需要用延时队列呢?考虑一下以下场景:
订单在十分钟之内未支付则自动取消。
新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
账单在一周内未支付,则自动结算。
用户注册成功后,如果三天内没有登陆则进行短信提醒。
用户发起退款,如果三天内没有得到处理则通知相关运营人员。
预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。
延迟队列的实现方式:
1,利用TTL+死信队列
生产者生产一条延时消息,根据需要延时时间的不同,利用不同的routingkey将消息路由到不同的延时队列,每个队列都设置了不同的TTL属性,并绑定在同一个死信交换机中,消息过期后,根据routingkey的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行处理即可。
这种方式的弊端,无法做到通用性,每搞一个新的延迟任务,都要去实现一个实现的TTL+死信队列,比较麻烦;
2,利用RabbitMQ插件实现
安装一个插件即可,下载rabbitmq_delayed_message_exchange插件,然后解压放置到RabbitMQ的插件目录。
把下载的插件 放到 容器内的 /plugins 目录内
rabbitmq-plugins enable rabbitmq_delayed_message_exchange 安装插件
重启 rabbitmq 容器
// 常量配置
public class RabbitMQCommonConfig {
// 延时交换机
public static final String DELAYED_DIRECT_EXCHANGE = "delayedDirectExchange";
// 延时队列
public static final String DELAYED_DIRECT_QUEUE = "delayedDirectQueue";
// 延时路由KEY
public static final String DELAYED_DIRECT_ROUTE_KEY = "delayedDirectRouteKey";
}
package com.qf.springbootmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* @author zed
* @date 2022/6/8 21:45
* 延迟队列配置类
*/
@Configuration
public class DelayConfig {
@Bean
public CustomExchange delayedCustomExchange() {
String type = "x-delayed-message";
return new CustomExchange(RabbitMQCommonConfig.DELAYED_DIRECT_EXCHANGE, type, true, false, new HashMap<String, Object>() {{
put("x-delayed-type", "direct");
}});
}
@Bean
public Queue delayedDirectQueue() {
return new Queue(RabbitMQCommonConfig.DELAYED_DIRECT_QUEUE);
}
@Bean
public Binding delayedDirectBinding() {
return BindingBuilder.bind(delayedDirectQueue()).to(delayedCustomExchange()).with(RabbitMQCommonConfig.DELAYED_DIRECT_ROUTE_KEY).noargs();
}
}
package com.qf.springbootmq.consume;
import com.qf.springbootmq.config.RabbitMQCommonConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
/**
* @author zed
* @date 2022/6/8 21:57
* 延迟队列消费者
*/
@Component
public class DelayConsumer {
@RabbitListener(queues = RabbitMQCommonConfig.DELAYED_DIRECT_QUEUE)
public void receiver(String msg, Channel channel, Message message) throws IOException {
System.out.println("延迟队列消费消息:" + msg + ",时间是:" + new Date());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
package com.qf.springbootmq.controller;
import com.qf.springbootmq.config.RabbitMQCommonConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
/**
* @author zed
* @date 2022/6/8 22:00
* 延时队列发送方
*/
@RestController
public class DelayedController {
@Resource
RabbitTemplate rabbitTemplate;
@GetMapping("test")
public String testMsg(String msg) {
rabbitTemplate.convertAndSend(RabbitMQCommonConfig.DELAYED_DIRECT_EXCHANGE, RabbitMQCommonConfig.DELAYED_DIRECT_ROUTE_KEY, msg.getBytes(StandardCharsets.UTF_8), (message) -> {
message.getMessageProperties().setDelay(10000);
return message;
});
System.out.println("消息发送时间:" + new Date());
return "success";
}
}
6.MQ的应用场景
6.1异步处理
6.2应用解耦
6.3流量削峰
1.用户的请求,服务器接收到之后,首先写入到消息队列,加入消息队列长度超过最大值,则直接抛弃用户的请求或跳转到错误页面。
2.秒杀业务根据消息队列中的请求信息,在做后续处理。
7.练习
创建消息服务 发送阿里云手机短信验证码
腾讯COS 京东万象
::: details 本文档代码
:::