1.消息服务概述
1.1 什么是消息中间件
MQ(全称Message Queue,意为消息队列),是消息的传输过程中保存消息的容器,多用于分布式系统之间进行通信。MQ本质上是一个队列,以先进先出的顺序进行消息传递。在互联网架构中,MQ是一种非常常见的上下游”逻辑解耦+物理解耦”的消息通信服务。
在未使用MQ技术的应用中,系统之间是直接相互通信的,系统间的相互调用当并发量较高的情况下,下层服务依赖上层服务可能会因为响应不及时从而导致服务器宕机,应用瘫痪。
当应用加入MQ消息队列后,系统间的互相调用可以使用MQ进行管理,生产者将消息发送到中间件,当消费者从中间件中获取消息则表示通信完成。当并发量较高的情况下,系统间的通信已经交给MQ管理,下层服务无需等待上层服务的响应,直接从消息中间件中获取消息即可。
1.2 为什么要使用消息中间件
在多数应用尤其是分布式系统中,消息服务是不可或缺的重要部分,它使用起来比较简单,同时解获了不少难题,例如异步提速、应用解耦、流量削锋、分布式事务管理等,使用消息服务可以实现一个高性能、高可用、高扩展的系统。
(1)应用解耦
场景说明,用户下单后订单服务需要通知库存服务。下面我们使用图示的方式直观展示上述需求的不同处理方式,如图所示。
从上图中我们可以看出,如果使用传统方式处理订单业务,用户下单后,订单服务会直接调用库存服务接口进行库存更新,这种方式有一个很大的问题是,一旦库存系统出现异常,订单服务会失败,导致订单丢失。如果使用消息服务模式,订单服务的下订单消息会快速写入消息队列,库存服务会监听并读取到订单,从而修改库存。相对于传统方式,消息服务模式显得更高效可靠。
(2)异步提速
场景说明︰用户注册后,系统需要将信息写入数据库并发送注册邮件和注册短信通知。下面我们使用图示的方式直观展示上述场景的不同处理方式,如下图所示。
在上图中,针对上述注册业务的场景需求,处理方式有3种,如下所示:
- 串行处理方式:用户发送注册请求后,服务器会先将注册信息写入到数据库,依次发送注册邮件和短信消息,服务器只有在消息处理完毕后才会将处理结果返回给客户端。这种串行处理消息的方式非常耗时,用户体验不友好。
- 并行处理方式︰用户发送请注册请求后,将注册信息写入到数据库,同时发送注册邮件和短信,最后返回给客户端。这种并行处理的方式在一定程度上提高了后台业务处理的效率,但如果遇到较为耗时的业务处理,仍显得不够完善。
- 消息服务处理方式:可以在业务中嵌入消息服务中间件进行业务处理,这种方式先将注册信息写入到数据库,在极短的时间内将注册信息写入消息队列后,即可返回响应信息。此时,前端业务不需要理会不相干的后台业务处理,而发送邮件和短信的业务会自动读取消息队列中的相关消息,进行后续业务处理。
(3)流量削峰
场景说明︰秒杀活动是流量削峰的一种应用场景,由于服务器处理资源能力有限,因此出现峰值时很容易造成服务器宕机,用户无法访问的情况。为了解决这个问题,通常会采用消息队列缓冲瞬时高峰流量,对请求进行分层过滤。从而过滤掉一些请求。下图描述的是流量削峰场景的处理方式。
针对上述秒杀业务的场景需求,如果专门增设服务器来应对秒杀活动期间的请求瞬时高峰的话,在非秒杀活动期间,这些多余的服务器和配置显得有些浪费。如果不进行有效处理的话,秒杀活动瞬时高峰流量请求有可能压垮服务,因此在秒杀活动中加入消息服务是较为理想的解决方案。通过在应用前端加入消息服务,先将所有请求写入到消息队列。并限定一定的阈值,多余的请求直接返回秒杀失败,秒杀服务会根据秒杀规则从消息队列中读取并处理有限的秒杀请求。
1.3 使用消息中间件的注意事项
消息服务在分布式系统中有一定的优势,但也存在一定的不足,例如系统可用性降低、系统复杂度提高、一致性问题等。
1.系统可用性降低
系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。
2.系统复杂度提高
MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。
3.一致性问题
A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败。
经过上述分析,使用消息服务中间件的条件如下所示:
- 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。
- 容许短暂的不—致性。
- 解耦提速削峰效率高。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。
1.4 常用的消息中间件
消息队列中间件是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。目前开源的消息中间件可谓是琳琅满目,大家耳熟能详的有很多,比如ActiveMQ,RabbitMQ,Kafka,RocketMQ等。目前市面上的消息中间件各有侧重点,选择合适自己能够扬长避短的无疑是最好的选择,接下来我们针对常用的消息服务中间件进行介绍。
(1)ActiveMQ
ActiveMQ是Apache公司出品的、采用Java语言编写的、完全基于UJMS(Java Message Service)规范的面向消息的中间件,它为应用程序提供高效、可扩展的、稳定的、安全的企业级消息通信。ActiveMQ丰富的APlI和多种集群构建模式使得它成为业界老牌的消息中间件,广泛的应用于中小型企业中。相较于后续出现的RabbitMQ,RocketMQ,Kafka等消息中间件来说,ActiveMQ性能相对较弱,在如今的高并发大数据处理的场景下显得力不从心,经常会出现一些问题,例如消息延迟、堆积、堵塞等。
(2)Kafka
Kafka是Apache软件基金机会开发的一个开源流处理平台,它是一种高吞吐量的分布式订阅消息系统,采用Scala和Java语言编写,提供了快速,可扩展的,分布式的,分区的和可复制的日志订阅服务,其主要特点是追求高吞吐量,适合用于产生大量数据的互联网服务的数据收集业务。其缺点是Kafka单机超过64个队列/分区,Load 会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试,社区更新较慢。
(3)RocketMQ
RocketMQ是阿里巴巴公司的开源产品,目前也是Apache公司的顶级项目,使用纯Java开发,具有高吞吐量、高可用、适合大规模分布式系统应用的特点。RocketMQ的思路起源于Kafka,对消息的可靠传输以及事务性做了优化,目前在阿里巴巴中被广泛应用于交易、充值、流计算、消息推送、日志流式处理场景,不过维护上稍微有些麻烦。
(4) RabbitMQ
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议实现。AMQP是为应对大规模并发活动而提供统一消息服务的应用层标准高级消息队列协议,专门为面向消息的中间件设计,该协议更多应用在企业系统内,对数据系一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。正是基于AMQP协议的各种优势性能,使得RabbitMQ消息中间件在应用开发中越来越受欢迎。
AMQP是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。AMQP是一个二进制协议,拥有一些现代化特点:多信道、协商式,异步,安全,扩平台,中立,高效。
在实际项目技术选型时,在没有特别要求的场景下,通常会选择使用RabbitMQ作为消息中间件。如果针对的是大数据业务,推荐使用Kafka或者是RocketMQ作为消息中间件。
2. RabbitMQ消息中间件
2.1 RabblitMQ简介
RabbitMQ是基于AMQP协议的轻量级、可靠、可伸缩和可移植的消息代理,Spring使用RabbitMQ通过AMQP协议进行通信,在Spring Boot中对RabbitMQ进行了集成管理。
2007年,Rabbit 技术公司基于AMQP(高级消息队列协议)标准开发的,可服用的企业消息系统,是当前最主流的消息中间件之一。RabbitMQ采用Erlang 语言开发,Erlang 语言专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。
RabbitMQ优势在于高并发特性,性能较好,吞吐量到万级,MQ功能比较完备、健壮、稳定、易用、跨平台、支持多种语言,如: Python、Ruby、.NET、Java、C、PHP等。开源提供的管理界面非常丰富完善,设置活跃度高,更新频率相当高,但是RabbitMQ商业版需要收费,学习成本比较高。
2.2 RabbitMQ核心概念
RabbitMQ消息中间件共有4大核心,分别是生产者、交换机、队列和消费者。
(1)生产者
产生数据发送消息的程序称为生产者,例如:手机厂商、工厂厂家等。
(2)交换机
交换机是RabbitMQ非常重要的一个组件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须明确地知道如何处理它接收到的消息,是将这些消息推送到特定队列,还是推送到多个队列,或者是把消息丢弃,这个得由交换机的类型决定。
(3)队列
对列是RabbitMQ内部使用的一种数据结构,尽管消息流经RabbitMQ和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区,许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据,这就是我们使用队列的方式。
(4)消费者
消费与接收具有相似的含义,消费者大多时候是一个等待接收消息的程序。需要注意的是,生产者、消费者和消息中间件很多时候并不在同一个机器上。同一个应用程序既可以是生产者,也可以是消费者。
2.3 RabbitMQ架构
- Broker:接收和分发消息的应用。
- Connection:生产者(Producer)/消费者(Consumer)与RabbitMQ Server之间的TCP连接。
- Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP连接的开销将是巨大的,效率也较低。Channel是在 connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个线程创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel 之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP连接的开销。
- Exchange︰消息Message到达 broker 的第一站(交换机),根据分发规则,匹配查询表中的 routing key,分发消息到队列中去。常用的类型有:direct,topic和fanout等。
- Queue:消息最终被送到队列这里,等待消费者取走该消息。
- Binding:交换机exchange和队列queue之间的虚拟连接, binding 中可以包含 routing key,Binding信息被保存到exchange 中的查询表中,用于message 的分发依据
2.4 RabbitMQ核心
RabbitMQ消息队列主要队列模式:有简单模式,工作队列模式,发布订阅模式,路由模式,通配主题模式,RPC模式和发布确认模式。
3. RabbitMQ环境安装
3.1 下载RabbitMQ
在RabbitMQ官方网站下载对应操作系统的RabbitMQ Server,此处以centos 8为例进行下载安装。
需要注意的是,在Centos 8操作系统中,下载的版本如上图所示,名称含有“el8”的软件才能够正常的在CentoS8操作系统中安装。对于Red Hat 7或CentoS 7,将“el8”替换为“el7”的版本进行下载。
此外,安装RabbitMQ还需要Erlang 环境,下载Erlang软件版本需要与RabbitMQ版本一致,可参考官方文档说明进行下载go语言。
3.2 安装RabbitMQ
将下载好的Erlang和RabbitvQ 上传至/usr/local/software/rabbitmq目录下(如目录不存在则需要提前创建)。
(1)安装Erlang
RabbitMQ基于Erlang开发环境,安装RabbitMQ前需要安装Erlang环境,命令如下所示:
(2)安装RabbitMQ服务依赖#安装Erlang
rpm -ivh erlang-23.3.4.4-1.el8.x86_64.rpm --nodeps --force
#查看Erlang版本
erl -v
(3)安装RabbitMQ#安装rabbitmq-server依赖环境
yum -y install socat
#安装RabbitMQ服务
rpm -ivh rabbitmq-server-3.8.18-1.el8.noarch.rpm --nodeps --force
3.3 启动、关闭和重启服务
(1)启动RabbitMQ服务
(2)关闭RabbitMQ服务systemctl start rabbitmq-server
(3)查看RabbitMQ服务状态systemctl stop rabbitmq-server
(4)重启RabbitMQ服务systemctl status rabbitmq-server
(5)设置开机自动启动systemctl restart rabbitmq-server
systemctl enable rabbitmq-server
3.4 开启web管理界面
默认情况下,RabbitMQ是没有安装Web端的客户端插件,需要自行安装才可生效。
(1)安装web管理界面
(2)重启RabbitMQ服务rabbitmq-plugins enable rabbitmq_management
注意:web管理界面安装完毕后需要重启RabbitMQ服务。
(3)访问RabbitMQsystemctl restart rabbitmq-server
在windows系统下访问RabbitMQ管理界面,需要开放15672端口。如果RabbitMQ安装在服务器(阿里云、腾讯云等),需要在对应服务器的安全组中开放15672端口。
在windows系统的浏览器中输入 http:/ /服务器IP地址:15672/进行访问,如下图所示:#开放15672端口
firewall-cmd --zone=public --add-port=15672/tcp --permanent
#配首立即生效
firewall-cmd --reload
注意:RabbitMQ有一个默认账号和密码为guest 的用户,默认情况下,该用户只能在localhost本机下访问,所以需要添加一个远程登录的用户。
RabbitMQ的预览界面:
RabbitMQ连接界面:
RabbitMQ信道界面:
RabbitMQ交换机界面:
RabbitMQ队列界面:
RabbitMQ添加admin界面:
补充:Centos7开放及查看端口 ```java 1、开放端口
firewall-cmd —zone=public —add-port=5672/tcp —permanent # 开放5672端口
firewall-cmd —zone=public —remove-port=5672/tcp —permanent #关闭5672端口
firewall-cmd —reload # 配置立即生效
2、查看防火墙所有开放的端口
firewall-cmd —zone=public —list-ports
3、关闭防火墙
如果要开放的端口太多,嫌麻烦,可以关闭防火墙,安全性自行评估
systemctl stop firewalld.service
4、查看防火墙状态
firewall-cmd —state
5、查看监听的端口
netstat -lnpt
PS:centos7默认没有 netstat 命令,需要安装 net-tools 工具,yum install -y net-tools
6、检查端口被哪个进程占用
netstat -lnpt |grep 5672
7、查看进程的详细信息
ps 6832
8、中止进程
kill -9 6832
<a name="ut0Ln"></a>
## 3.5 授权配置
RabbitMQ在默认情况下只有 guest 用户,且该用户无法实现远程登录,所以需要新增新的用户并双操作。<br />(1)新增用户
```basic
#语法: rabbitmqctl add_user账号密码
#创建用户名和密码均为admin的账号
rabbitmqctl add_user admin admin
(2)设置用户操作权限
#语法: rabbitmqctl set_user_tags 账号角色 权限
#分配超级管理员权限
rabbitmqctl set_user_tags admin administrator
用户角色级别依次如下所示:
- administrator(超级管理员)∶可以登录控制台,查看所有信息,对RabbitMQ进行管理。
- monitoring(监控者)∶可以登录控制台,查看所有信息。
- policymaker(策略制定者)∶可以登录控制台,指定策略。
- management(普通管理员)∶可以登录控制台。
(3)为用户添加资源权限
#语法: rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*"
#为用户设置administrator角色
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
(4)修改用户密码
#语法: rabbitmqctl change_password 用户名新密码
#修改用户密码为123456
rabbitmqctl change_password admin 123456
(5)删除用户
#语法: rabbitmqctl delete_user 用户名
#册除用户
rabbitmqctl delete_user admin
(6)查询用户列表
#语法:rabbitmqctl list_users
#册除用户
rabbitmqctl delete_user admin
3.6 基于Docker安装RabbitMQ
基于Docker安装RabbitMQ,需要提前安装好Docker并启动Docker服务,随后拉取RabbitMQ镜像,创建并运行容器即可使用RabbitMQ。
(1)获取RabbitMQ镜像
#拉取RabbitPQ镜像
docker pull rabbitmq:management
(2)创建并运行容器
docker run -id --name=myrabbit -p 15672:15672 rabbitmq:management
- —hostname:指定容器主机名称
- —name:指定容器名称
- -p:将MQ端口号映射到宿主机
需要注意的是,以上2个步骤的命令进行安装,RabbitMQ没有设置用户和密码及该用户对应的权限,可以使用以下命令(运行时设置用户名和密码)进行设置:
docker run -id --name myrabbit \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
-p 15672:15672 \
-p 5672:5672 \
-p 61613:61613 \
-p 1883:1883 \
rabbitmq:management
4. 简单模式
4.1 简单模式概述
发送单个消息的生产者(发送者)和接收消息并将其打印出来的消费者(接收者)。这是消息传递的”Hello World”。在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的盒子是一个队列——RabbitMQ代表消费者保留的消息缓冲区。
在上图的模型中,有以下概念:
- P:生产者,也就是要发送消息的程序
- C:消费者,消息的接收者,会一直等待消息到来
- Queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息
4.2 创建Maven工程
创建名为rabbitmqe1的Maven项目,并在该项目中的pom.xml配置文件加入RabbitMQ核心依赖,代码如下所示:
<!-- RabbitMQ 客户端依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.o</version>
</dependency>
4.3 编写生产者
在com.manong.hello包下创建编写消息生产者Producer类,代码如下所示:
package com.manong.hello;
import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.ConnectionFactory;
/*
生产者:用于发送消息
*/
public class Producer {
//队列名称
public static final string QUEUE_NAME="hello-queue";
public static void main(string[] args) throws Exception{
//1.创建工厂对象
ConnectionFactory connectionFactory = new ConnectionF actory();
//2.设置参数信息
//主机地址,默认为localhost
connectionFactory.setHost("192.168.137.129");
//连接端口,默认为5672 ,web端口界面为15672 注意:Linux一定要开放端口号,关闭防火墙
connectionFactory.setPort(5672);
//虚拟主机名称,默认为/
connectionFactory.setvirtualHost("/manong");
//连接用户名,默认为guest
connectionFactory.setUsername("admin");
//连接密码,默认为guest
connectionFactory.setPassword("123456");
//3.创建连接对象
Connection connection = connectionFactory.newConnection();
//4.创建频道对象
Channel channel = connection.createchanne1();
//5.创建队列对象(声明队列)
//参数1:队列名称
//参数2:是否持久化队列
//参数3:是否独占本次连接
//参数4:是否在不使用队列时自动删除
//参数5:队列其他参数
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//6.发送信息
string message = "Hello,RabbitMQ~o~";
/*
*参数1:交换机名称,如果没有指定则使用默认Default Exchange
*参数2:路由key,简单模式可以传递队列名称
*参数3:消息其它属性
*参数4:消息内容(byte类型,注意转换)
*/
channel.basicPublish("",QUEUE_NAME,nul1,message.getBytes());
//打印
system.out.println("已发送消息:"+message);
}
}
4.4 编写消费者
在com.manong.hello包下创建编写消息消费者Consumer类,代码如下所示:
package com.manong.hello;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
*消费者:用于接收信息
*/
public class consumer {
//队列名称
static final string QUEUE_NANE="hello-queue";
public static void main(string[] args) throws Exception{
//队列名称
public static final string QUEUE_NAME="hello-queue";
public static void main(string[] args) throws Exception{
//1.创建工厂对象
ConnectionFactory connectionFactory = new ConnectionF actory();
//2.设置参数信息
//主机地址,默认为localhost
connectionFactory.setHost("192.168.137.129");
//连接端口,默认为5672 ,web端口界面为15672 注意:Linux一定要开放端口号,关闭防火墙
connectionFactory.setPort(5672);
//虚拟主机名称,默认为/
connectionFactory.setVirtualHost("/manong");
//连接用户名,默认为guest
connectionFactory.setUsername("admin");
//连接密码,默认为guest
connectionFactory.setPassword("123456");
//3.创建连接对象
Connection connection = connectionFactory.newConnection();
//4.创建频道对象
Channel channel = connection.createChanne1();
//5.创建队列对象(声明队列)
//参数1:队列名称
//参数2:是否持久化队列
//参数3:是否独占本次连接
//参数4:是否在不使用队列时自动删除
//参数5:队列其他参数
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//6.接收信息
//6.1:创建消费者并设置消息处理
Defaultconsumer consumer = new Defaultconsumer(channel){
/**
*消费回调函数,当收到消息后,会白动执行该方法
* @param consumerTag消费者标识
* @param envelope消息包的内容(如:交换机、路由Key、消息ID等信息)
* @param properties属性信息
* @param body消息数据
* @throws IOException
*/
@override
public void handleDelivery(string consumerTag,Envelope envelope,
AMQP.BasicProperties properties,byte[] body) throws IOException {
//路由key
system.out.println("路由key为: " + envelope.getRoutingKey());
//交换机
system.out.println("交换机为:" + envelope.getExchange());
//消息id
system.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
system.out.println("接收到的消息为:" + new string(body,"utf-8"));
}
};
//6.2监听消息
//参数1:队列名称
//参数2:是否自动确认消息,true则表示消息接收到后自动向MQ回复接收到了,MQ接收到回复会删除消息,设置为false则需要手动确认
//参数3:消息接收收到回调
channel.basicConsume(QUEUE_NAME,true,consumer);
//7.注意:消费者不能关闭连接,要时刻从队列中获取数据
}
}
}
5. 工作队列模式(Work Queues)
5.1 工作队列模式概述
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行,我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
Work Queues工作队列模式的流程图如下图所示:
在Work queues工作队列模式中,不需要设置交换器(RabbitMQ会使用内部默认的交换器进行消息转换)。需要指定唯一的消息队列进行消息传递,并且可以有多个消息消费者。在这种模式下,多个消息消费者通过轮询的方式依次接收消息队列中存储的消息,一旦消息被某一个消费者接收,消息队列会将消息移除,而接收并处理消息的消费者必须在消费完一条消息后再准备接收下一条消息。主要应用于对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
5.2 轮询发布消息
5.2.1 抽取工具类
将连接RabbitMQ服务的代码抽取成工具类,注意修改主机地址、虚拟机名称及用户名和密码。
package com.manong.utils;
import com.rabbitmq.client.connection;
import com .rabbitmq.client.ConnectionFactory;
public class Connectionutil {
/**
* 获取连接对象
* @return
* @throws Exception
*/
public static connecticn getconnection() throws Exception{
//1.创建工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.设置参数信息
//主机地址,默认为localhost
connectionFactory.setHost("192.168.137.129");
//连接端口,默认为5672
connectionFactory.setPort(5672);
//虚拟主机名称,默认为/
connectionFactory.setVirtualHost("/manong");
//连接用户名,默认为guest
connectionFactory.setUsername("admin");
//连接密码,默认为guest
connectionFactory.setPassword("123456");
//3.创建连接对象
return connectionFactory. newConnection();
}
5.2.2 编写生产者
在com.manong.work包下编写消息生产者Producer1类,代码如下所示:
public class Producer1 {
//队列名称
static final string QUEUE_NANE="work-queue";
public static void main(string[] args) throws Exception{
//获取连接对象
Connection connection = Connectionutil.getConnection();
//创建频首对象
Channel channel = connection.createChannel();
//创建队列对象
channel.queueDeclare(QUEUE_NAME ,true,false,false,null);
for (int i = 1; i <= 10; i++) {
//发送信息
String message = i + "Hello,RabbitMQvv";
channel.basicPublish("" ,QUEUE_NAME,null,message.getBytes());
}
System.out.println("消息已发送完毕!");
}
}
5.2.3 编写消费者
在com.manong.work包下编写两个消息消费者类,分别是Consumer1和Consumer2。
(1)编写Consumer1类
package com.manong. work;
import com.manong.connectionutil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(string[] args) throws Exception{
//创建连接对象
Connection connection = ConnectionUtil.getConnection();
//创建频道对象
Channel channel = connection.createChannel();
//创建队列对象
channel.queueDeclare(Producer.QUEUE_NAME , true,false,false,null);
//接收信息
//创建消费者并设置消息处理
Defaultconsumer consumer = new DefaultConsumer(channel){
/**
* 消费回调函数,当收到消息后,会自动执行该方法
* @param consumerTag 消费者标识
* @param envelope 消息包的内容(如:交换机、路由Key、消息ID等信息)
* @param properties 属性信息
* @param body 消息数据
* @throws IOException
*/
@override
public void handleDelivery(string consumerTag,Envelope envelope,
AMQP.BasicProperties properties,byte[] body) throws IOException {
//收到的消息
system.out.println("接收到的消息为:" +new string(body,"utf-8"));
}
};
//监听消息(参数1:队列名称,参数2:是否自动确认消息,参数3:消息接收收到回调)
channel.basicConsume(Producer.QUEUE_NAME,true,consumer);
}
}
5.2.4 结果展示
5.2.5 轮询分发消息小结
1.在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
2. Work Queues对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功发送即可。
5.3 消息应答
5.3.1 消息应答概述
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。为了保证消息在发送过程中不丢失,RabbitMQ引入消息应答机制,消息应答就是消费者在接收到消息并且处理该消息之后,告诉RabbitMQ它已经处理了,RabbitMQ可以把该消息从队列中删除了。
5.3.2 消息自动应答
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者channel关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
5.3.3 消息手动应答
消费者从队列中消费消息可以采用手动应答,自动应答可能会导致消息未完全消费从而导致消息失效。使用手动应答的好处是可以批量应答并且减少网络拥堵。以下3个方法用于手动应答消息∶
- Channel.basicAck()∶用于肯定确认,即RabbitMQ已经知道该消息被消费且成功处理消息,可以将其丢弃。
- Channel.basicNack():用于否定确认。
- Channel.basicReject() :用于否定确认,与Channel. basicNack()方法相比少一个参数,即不处理该消息直接拒绝,随后将消息丢弃。
5.3.4批量确认(Multiple)
批量确认的方法为channel. basicAck(deliverTag, true),其中参数2为是否批量确认。若为true,代表批量确认队列上未应答的消息,例如: channel. 上有传送tag的消息5,6,7,8 当前tag是8那么此时5-8的这些还未应答的消息都会被确认收到消息应答。若为false, 同上面相比,只会应答tag=8的消息5,6,7这三个消息依然不会被确认收到消息应答。5.3.5 消息自动重新入队
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或TCP连接丢失),导致消息未发送ACK确认,RabbitMQ将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另-个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢先任何消息。
5.3.6 消息手动应答代码
(1)创建生产者类
在com. manong . work包下创建生产者Producer2 类,代码如下所示: ```java package com.manong.work;
import com.manong.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer2 { //队列名称 static final string QUEUE NAME=”ack-work-queue” ; public static void main(String[] args) throws Exception{ //获取连接对象 Connection connection = Connectionutil.getConnection(); //创建频道对象 Channel channel = connection.createChannel(); //创建队列对象 channel.queueDeclare(QUEUE NAME ,true,false, false , null); for(inti=1;i<=10;i++){ //发送信息 string message = i + “Hello, RabbitMQ~~~” ; channel.basicPublish(“” ,QUEUE_ NAME ,null ,message.getBytes()); } system.out.println( “消息已发送完毕! “); }
(2) 编写消费者类<br />在com.manong.work包下编写消息消费者类Consumer3代码如下所示:<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/21942751/1656124856203-2633c42e-68c7-4e87-ac4e-eb3bd4398a78.png#clientId=u356e5fa6-4b27-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=403&id=u94595f7d&margin=%5Bobject%20Object%5D&name=image.png&originHeight=403&originWidth=606&originalType=binary&ratio=1&rotation=0&showTitle=false&size=100728&status=done&style=none&taskId=u4386bd77-2239-481f-8a87-87ac2c459a1&title=&width=606)
```java
import com.manong.utils.connectionutil;
import com.manong.utils.sleeputil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumers {
public static void main(string[] args) throws Exception{
//创建连接对象
Connection connection = Connectionutil.getConnection();
//创建频首对象
Channel channel = connection.createChannel();
//创建队列对象
channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null);
system.out.println("消费者1等待接收消息处理时长较短");
//接收信息
//创建消费者并设置消息处理
Defaultconsumer consumer = new Defaultconsumer(channel){
/*
*消费回调函数,当收到消息后,会自动执行该方法
* @param consumerTag消费者标识
* @param envelope消息包的内容(如:交换机、路由Key、消息ID等信息)
* @param properties属性信息
* @aram body消息数据
* throws IOException
*/
@Override
public void handleDelivery(string consumerTag,Envelope envelope,
AMQP.BasicProperties properties,byte[] body) throws IOException {
//睡眠1秒
SleepUtil.sleep(1);
//收到的消息
System.out.println("收到的消息为:" +new String(body,"utf-8"));
//手动确认消息(消息ID,是否批量确认)
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//监听消息(参数1:队列名称,参数2:是否自动确认消息,参数3:消息接收收到回调)
channel.basicConsume(Producer2.QUEUE_NAME,faise, consumer);
}
}
(3)编写消费者类
在com.manong.work包下编写消息消费者类Consumer4 ,代码如下所示:
package com.manong. work;
import com.manong.utils.connectionutil;import com.manong.utils.sleeputil;
import com.rabbitmq.client.* ;
import java.io.IOException;
public class Consumer2 {
public static void main(string[]args) throws Exception{
//创建连接对象
Connection connection = Connectionutil.getConnection();
//创建频道对象
Channel channel = connection.createChannel();
//创建队列对象
channel.queueDeclare(Producer. QUEUE_NAME , true,false,false,null);
System.out.printIn("消费者2等待接收消息,处理时长较长");
//接收信息
//创建消费者并设置消息处理
Defaultconsumer consumer = new DefaultConsumer(channel){
/**
* 消费回调函数,当收到消息后,会自动执行该方法
* @param consumerTag消费者标识
* @param envelope消息包的内容(如:交换机、路由Key、消息ID等信息)
* @param properties属性信息
* @param body消息数据
* @throws IOException
*/
@override
public void handleDelivery(string consumerTag,Envelope envelope,
AMQP.BasicProperties properties,byte[] body) throws IOException {
//睡眠3o秒
sleeputil.sleep(30);
//收到的消息
system.out.println("接收到的消息为:" +new string(body,"utf-8"));
//手动确认消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//监听消息(参数1:队列名称,参数2:是否自动确认消息,参数3:消息接收收到回调)
channel.basicconsume(Producer.QUEUE_NAME,false,consumer);
}
}
5.3.7 消息手动应答效果
首先启动消费者1和消费者2,然后启动生产者。正常情况下生产者发送到队列中,两个消费者分别接收到消息并进行处理。
再次重新测试,观察两个消费者的消费情况,随后将消费者2停掉,再次观察消费者1控制台打印的信息,发现队列中未被消费的消息重新进入到了队列中,并被消费者1进行消费。
5.4 RabbitMQ持久化
5.4.1 RabbitMQ持久化概念
前面我们已经学习了如何处理任务不丢失的情况,但是如何保障RabbitMQ服务停掉后消息生产者发送过来的消息不丢失呢?默认情况下,RabbitMQ服务宕机或由于某种原因崩溃时,它会忽略队列中的消息。如果要确保消息不会丢失,则需要将队列和消息都标识为持久化。
5.4.2 队列持久化
在创建队列时如果没有将队列设置成持久化,RabbitMQ服务一旦重启,该队列就会被删除掉。如果要实现队列的持久化,需要在声明队列时将durable参数设置为true。
//参数依次为:队列名称、是否持久化、是否排它、是否自动删除、其它参数
Queue.Declareok queueDeclare(string queue,boolean durable,boolean exclusive,
boolean autoDelete,Map<string,object> arguments) throws IOException;
需要注意的是,如果之前声明的队列不是持久化的,需要将原有的队列先删除,或者重新创建一个新的持久化的队列,否则会出现错误。
以下为控制台中持久化与非持久化队列的显示区域(持久化状态为D):
5.4.3 消息持久化
如果要实现消息持久化,需要在消息生产者发送消息时设置MessageProperties.PERSITENT_TEXT_PLALINM属性,如下代码所示:
//参数依次为:交换机名称、队列名称、消息持久化、消息
channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
将消息标记为持久化并不能完全保证不会丢失消息,尽管它告诉RabbitMQ将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候,但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要更强有力的持久化策略,则可以使用发布确认模式。
5.4.4 不公平分发
最开始的时候我们学习到RabbitMQ分发消息采用的轮询分发,但是在某种场景下这种策略并不是很好,例如:有两个消费者在处理任务,其中有个消费者1处理任务的速度非常快,而另外一个消费者2处理速度却很慢,此时采用轮询分发的话,处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是RabbitMQ并不知道这种情况它依然很公平的进行分发。为了避免这种情况,我们可以在消费者消费消息前设置参数channel.basicQos(1)。
6. 订阅模式
6.1 订阅模式概述
在前面的简单模式和工作队列模式中只有3个角色,分别是生产者、队列和消费者,如下图所示:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接收者,会一直等待消息到来。
- queue:消息队列,图中红色部分。
而在订阅模型中,多了一个exchange(交换机)的角色,而且过程略有变化,如下图所示:
- P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- C:消费者,消息的接受者,会一直等待消息到来。
- Queue:消息队列,接收消息、缓存消息。
Exchange:交换机,图中的X。一方面,接收生产者发送的消息;另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
6.2 交换机Exchange
6.2.1 交换机概述
RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列,交换机必须确切知道如何处理收到的消息,是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们,这就的由交换机的类型来决定。
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
6.2.2 交换机类型
Exchange有常见以下3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
6.3 Publish/Subscribe 发布订阅模式
6.3.1 发布订阅模式概述
Publish/Subscribe 发布订阅模式的流程图如下图所示:
在Publish/Subscribe工作模式中,必须先配置一个fanout类型的交换器,不需要指定对应的路由键(Routingkey),同时会将消息路由到每一个消息队列上,然后每个消息队列都可以对相同的消息进行接收存储,进而由各自消息队列关联的消费者进行消费。
从上面的分析可以发现,该工作模式是用于进行相同业务功能处理的场合。例如,用户注册成功后,需要同时发送邮件通知和短信通知,那么邮件服务消费者和短信服务消费者需要共同消费“用户注册成功”这一条消息。
6.3.2 编写生产者
在com.manong.fanout包下编写消息生产者 Producer1类,代码如下所示:
public class Producer {
//交换机名称
static final string FANOUT_EXCHANGE = "fanout_exchange";
//队列名称
static final string FANOUT_QUEUE_1 = "fanout_queue_1";
//队列名称
static final string FANOUT_QUEUE_2 = "fanout_queue_2";
public static void main(string[] args) throws Exception{
//获取连接对象
Connection connection = ConnectionUtil.getConnection();
//创建频道对象
Channel channel = connection.createchannel();
//设置交换机
//参数1:交换机名称
//参数2:交换机类型(direct定向, fanout广播, topic通配符的方式, headers参数匹配方式)
channel.exchangeDeclare(FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);
//创建队列对象
//参数1:队列名称
//参数2:是否持久化队列
//参数3:是否独占本次连接
//参数4:是否自动删除队列
//参数5:其它参数
channel. queueDeclare(FANOUT_QUEUE_1,true,false,false,null);
channel.queueDeclare(FANOUT_QUEUE_2,true,false,false,null);
//队列绑定交换机
//参数1:队列名称
//参数2:交换机名称
//参数3:路由键(绑定规则),如果交换机的类型为fanout,则routingKey设置为""
channel.queueBind( FANOUT_QUEUE_1,FANOUT_EXCHANGE,"");
channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHANGE,"");
//发送信息
for (int i = 1; i <= 10; i++){
String message = i + "--你好,小兔子!发布订阅模式";
channel.basicPublish(FANOUT_EXCHANGE,"" ,null,message.getBytes());
}
System.out.println("消息已发送完毕");
//释放资源
channel.close();
connection.close();
}
}
6.3.3 编写消费者1
在com.manong.fanout包下编写消息消费者Consumer1类,代码如下所示:
public class consumer1 {
public static void main(string[ ] args) throws Exception {
//获取连接对象
Connection connection = ConnectionUtil.getConnection();
//创建频道对象
Channel channel = connection.createchannel();
//声明交换机
channel.exchangeDeclare(Producer.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);
//声明(创建)队列
/*
*参数1:队列名称
*参数2:是否定义持久化队列
*参数3:是否独占本次连接
*参数4:是否在不使用的时候自动删除队列
*参数5:队列其它参数
*/
channel.queueDeclare(Producer.FANOUT_QUEUE_1,true,false,false,null);
//队列绑定交换机
channel.queueBind(Producer.FANOUT_QUEUE_1,Producer.FANOUT_EXCHANGE,"");
//创建消费者;并设置消息处理
Defaultconsumer consumer = new Defaultconsumer(channel){
@Override
/*
* consumerTag消息者标签,在channel.basicconsume时候可以指定
* envelope消息包的内容,可从中获取消息id,消息routingKey,交换机,消息和重传标志(收到消息失败后
是否需要重新发送)
* properties属性信息
* body消息
*/
public void handleDelivery(string consumerTag,Envelope envelope,AQP.BasicProperties
properties, byte[] body) throws IOException {
//路由key
//system.out. println(路由key为:" + envelope.getRoutingKey());
//交换机
//system.out. println(""交换机为:” + envelope.getExchange();
//消息id
//system.out . printin"“消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消费者1-接收到的消息为: " +new string( body,"utf-8"));
};
//监听消息
/*
* 参数1:队列名称
* 参数2:是否自动确认串
* 参数3:消息接收到后回调
*/
channel.basicConsume(Producer.FANOUT_QUEUE_1,true,consumer);
}
}
6.3.4 编写消费者2
public class consumer1 {
public static void main(string[ ] args) throws Exception {
//获取连接对象
Connection connection = ConnectionUtil.getConnection();
//创建频道对象
Channel channel = connection.createchannel();
//声明交换机
channel.exchangeDeclare(Producer.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);
//声明(创建)队列
/*
*参数1:队列名称
*参数2:是否定义持久化队列
*参数3:是否独占本次连接
*参数4:是否在不使用的时候自动删除队列
*参数5:队列其它参数
*/
channel.queueDeclare(Producer.FANOUT_QUEUE_2,true,false,false,null);
//队列绑定交换机
channel.queueBind(Producer.FANOUT_QUEUE_2,Producer.FANOUT_EXCHANGE,"");
//创建消费者;并设置消息处理
Defaultconsumer consumer = new Defaultconsumer(channel){
@Override
/*
* consumerTag消息者标签,在channel.basicconsume时候可以指定
* envelope消息包的内容,可从中获取消息id,消息routingKey,交换机,消息和重传标志(收到消息失败后
是否需要重新发送)
* properties属性信息
* body消息
*/
public void handleDelivery(string consumerTag,Envelope envelope,AQP.BasicProperties
properties, byte[] body) throws IOException {
//路由key
//system.out. println(路由key为:" + envelope.getRoutingKey());
//交换机
//system.out. println(""交换机为:” + envelope.getExchange();
//消息id
//system.out . printin"“消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消费者2-接收到的消息为: " +new string( body,"utf-8"));
};
//监听消息
/*
* 参数1:队列名称
* 参数2:是否自动确认串
* 参数3:消息接收到后回调
*/
channel.basicConsume(Producer.FANOUT_QUEUE_2,true,consumer);
}
}
6.3.5发布订阅模式测试
6.3.6发布订阅模式小结
1.交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
2.发布订阅模式与工作队列模式的区别:
- 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机
- 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)
- 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机
6.4 Routing路由模式
6.4.1 路由模式概述
Routing路由工作模式的流程图如下所示:
在Routing工作模式中,必须先配置一个direct类型的交换器,并指定不同的路由键值(Routing key)〉将对应的消息从交换器路由到不同的消息队列进行存储,由消费者进行各自消费。
从上面的分析可以发现,工作模式适用于进行不同类型消息分类处理的场合。例如,日志收集处理用户可以配置不同的路由键值,分别对不同级别的日志信息进行分类处理。6.4.2 编写生产者
在com.manong.direct包下编写消息消费者Consumer1类,代码如下所示: ```java public class Producer { //交换机名称 public static final String DIRECT_EXCHANGE = “direct_exchange”; //定义两个队列 public static final String DIRECT_QUEUE_1 = “direct_queue_1”; public static final String DIRECT_QUEUE_2 = “direct_queue_2”;
public static void main(String[] args){ try{ //创建频道对象 Channel channel = connection.createChannel(); //声明交换机(交换机名称、交换机类型、是否持久化交换机) channel.exchangeDeclare(DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT,true); //声明两个队列 channel.yueueDeclare(DIRECT_QUEUE_1,true,false,false,null); channel.queueDeclare(DIRECT_QUEUE_2,true,false,false,null); //将交换机与队列进行绑定 //队列1绑定交换机 channel.queueBind(DIRECT_QUEUE_1,DIRECT_EXCHANGE,”error”); //队列2绑定交换机 channel.queueBind(DIRECT_QUEUE_2,DIRECT_EXCHANGE,”info”); channel.queueBind(DIRECT_QUEUE_2,DIRECT_EXCHANGE,”error”); channel.queueBind(DIRECT_QUEUE_2,DIRECT_EXCHANGE,”warning”); //准备消息 string message = “日志信息:王明调用了find()方法,日志级别为: error”; //发送消息 channel.basicPublish(DIRECT_EXCHANGE,”error”,null,message.getBytes()); System.out.println(“消息发送成功~”); }catch (Exception e) { e.printstackTrace(); } } }
<a name="A5AA7"></a>
### 6.4.3 编写消费者1
在com.manong.direct包下编写消息消费者Consumer1 类,代码如下所示:
```java
public class Consumer1 {
public static void main(String[ ] args) throws Exception {
//获取连接对象
Connection connection = ConnectionUtil.getConnection();
//创建频道对象
Channel channel = connection.createChannel();
//创建消费者;并设置消息处理
DefaultConsumer consumer = new Defaultconsumer(channel){
@Override
/*
*consumerTag消息者标签,在channel.basicConsume时候可以指定
*envelope 消息包的内容,可从中获取消息id,消息routingKey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
*properties属性信息
*body消息
*/
public void handleDelivery ( String consumerTag,Envelope envelope,
AMQP.BasicProperties properties,byte[] body) throws IOException {
//收到的消息
System.out.println("消费者1-接收到的消息为:" + new String(body,"utf-8"));
System.out.println("日志信息保存到数据库....");
}
};
//监听消息
/*
*参数1:队列名称
*参数2:是否自动确认
*参数3:消息接收到后回调
*/
channel.basicConsume(Producer.DIRECT_QUEUE_1,true,consumer);
}
}
6.4.4 编写消费者2
6.4.5 路由模式测试
启动所有消费者,然后使用生产者发送消息﹔在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果。
在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges选项卡,点击direct_exchange的交换机,可以查看到如下的绑定:
Exchange: direct_exchange in virtual host /manong
6.4.6 小结
Routing 模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key 的队列。
6.5 Topics 主题/通配符模式
6.5.1 Topics主题/通配符模式
Topics通配符工作模式的流程图如下图所示:
在Topics工作模式中,必须先配置一个topic类型的交换器,并指定不同的路由键值(Routing key)将对应的消息从交换器路由到不同的消息队列进行存储,然后由消费者进行各自消费。
Topics模式和Routing模式的主要不同在于:Topics模式设置的路由键是包含通配符的,其中#表示匹配零到多个字符,*表示匹配一个字符,然后与其他字符一起使用 “.” 进行连接,从而组成动态路由键,在发送消息时可以根据需求设置不同的路由键,从而将消息路由到不同的消息队列。
通常情况下,Topics工作模式适合用于根据不同需求动态传递处理业务的场合。例如,一些订阅客户只接收邮件消息,一些订阅客户只接收短信消息,那么可以根据客户需求进行动态路由匹配,从而将订阅消息分发到不同的消息队列中。
6.5.2 编写生产者
在com.manong.topics包下编写消息生产者Producer1类,代码如下所示:
public class Producer {
//交换机名称
static final string Topic_EXCHANGE = "topic_exchange";
//队列名称
static final string Topic_QUEUE_1 = "topic_queue_1";
//队列名称
static final string Topic_QUEUE_2 = "topic_queue_2";
public static void main(string[] args) throws Exception{
//获取连接对象
Connection connection = Connectionutil.getconnection();
//创建频道对象
Channel channel = connection.createchannel();
//声明交换机(交换机名称、交换机类型、是否持久化交换机)
channel.exchangeDeclare(Topic_EXCHANGE,BuiltinExchangeType.DIRECT,true);
//设置交换机
//参数1:交换机名称
//参数2:交换机类型(direct定向, fanout广播, topic通配符的方式,headers参数匹配方式)
//参数3:是否持久化
//参数4:是否自动删除队列
//参数5:其它参数
channel.queueDeclare(Topic_QUEUE_1,true,false,false,null);
channel.queueDeclare(Topic_QUEUE_2,true,false,false,null);
//队列绑定交换机
//参数1:队列名称
//参数2:交换机名称
//参数3:路由键(绑定规则),如果交换机的类型为fanout,则routingKey设置为""
//队列1绑定交换机
//#匹配多个字符
channel.queueBind(Topic_QUEUE_1,Topic_EXCHANGE,"#.error");
//队列2绑定交换机
//*匹配1个字符
channel.queueBind(Topic_QUEUE_2,Topic_EXCHANGE,"order.*");
channel.queueBind(Topic_QUEUE_2,Topic_EXCHANGE,"*.*");
//发送信息
String message = "日志信息:王明调用了find()方法,日志级别为: info";
channel.basicpublish(Topic_EXCHANGE, "goods.info", null,message.getBytes());
System.out.println("消息已发送完毕");
//释放资源
channel.close();
connection.close();
}
}
6.5.3 编写消费者1
在com.manong.topics包下编写消息消费产者Consumer1类,代码如下所示:
public class consumer1 {
public static void main(string[ ] args) throws Exception {
//获取连接对象
Connection connection = Connectionutil.getconnection();
//创建频道对象
Channel channel = connection.createchannel();
//创建消费者;并设育消息处理
Defaultconsumerlconsumer = new DefaultConsumer(channel){
@Override
/*
* consumerTag消息者标签,在channel.basicConsume时候可以指定
* envelope 消息包的内容,可从中获取消息id,消息routingKey,交换机,消良和重传标志(收到消息失败后是否需要重新发送)
* properties属性信息
* body 消息
*/
public void handleDelivery(string consumerTag,Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException{
//收到的消息
System.out.println("消费者1-接收到的消息为: " + new string(body,"utf-8"));
System.out.println("日志信息保存到数据库....");
}
};
//监听消息
/*
*参数1:队列名称
*参数2:是否自动确认
*参数3:消息接收到后回调*/
channel.basicConsume(Producer.Topic_QUEUE_1,true,consumer);
}
}
6.5.4 编写消费者2
6.6 Publisher Confirms发布确认模式
6.6.1 发布确认模式概述
生产者将信道(channel)设置成confrm模式,一旦信道进入confrm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag 域包含了确认消息的序列号,此外 broker也可以设置basic.ack 的 multiple域,表示到这个序列号之前的所有消息都已经得到了处理。
上图中,生产者发送消息到队列及消费者从队列中消费消息,这个过程中任何一个环节出现问题,都有可能导致消息丢失(未能正常发送消息及被消费),如何确保消息正常发送及消费呢?前面学习了RabbitMQ中的队列持久化和消息持久化,这两个持久化并不能完全确保消息的可靠传递,要实现消息的可靠传递,必须同时满足以下3个条件:
1.声明队列时必须持久化
生产者发送消息到队列时,将durable参数设置为true (意为队列持久化操作),如下代码所示:
//参数依次为:队列名称、是否持久化、是否排它、是否自动删除、其它参数
Queue.Declareok queueDeclare(string queue,boolean durable,
boolean exclusive,boolean autoDelete,
Map<string,object> arguments) throws IOException;
以下为控制台中持久化与非持久化队列的显示区域(持久化状态为D):
2.队列中的消息必须持久化
消息持久化需要在生产者发送消息时设置 MessageProperties.PERSITENT_TEXT_PLAIN 属性,如下代码所示:
//参数依次为:交换机名称、队列名称、消息持久化、消息
channel.basicPublish("", QUEUE_NANE,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
3.发布确认
队列接收到生产者发送过来的数据后,队列将消息保存在磁盘后(达到持久化的目的),队列则将最终的可靠传递结果告诉生产者,这就是发布确认。RabbitMQ发布确认常用的策略有3种,分别是单个确认发布,批量确认发布和异步确认发布。
6.6.2 开启发布确认模式
RabbitMQ的发布确认模式默认是没有开启的,如果需要开启发布确认模式,需要调用confirmSelect()方法,在想要使用发布确认模式时,都需要在channel上调用该方法。
//创建信道对象
Channel channel = connection.createChannel();
//开启发布确认
channel.confirmSelect();
6.6.3 单个确认发布
单个确认发布是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后,只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。
package com.manong. confirm;
import com.manong.utils.Connectionutil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import java.uti1.UUID;
public class confirmMessage {
//消息数量
public static final Integer MESSAGE_COUNT = 1000;
public static void main(string[] args) throws Exception{
//调用单个确认消息
publishsingleconfirm();
}
/*
*单个确认
*/
public static void publishsingleconfirm() throws Exception{
//创建Connection对象
Connection connection = connectionutil.getConnection();
//创建信道对象
Channel channel = connection.createChannel();
//开启发布确认
channel.confirmSelect();
//队列名称
String queueName = UUID.randomUUID().tostring();
//创建队列对象
channel. queueDeclare(queueName ,true,false,false,null);
//记录开始时间
long start = System.currentTimeAillis();//发送消息
for (Integer i = 0; i < MESSAGE_COUNT; i++){
//消息
String message = i.tostring();//发送消息
channel.basicPublish("" ,queueName,message.getBytes());
//服务端返回false或在超时时间内没有返回数据,生产者可以重新发送消息
boolean flag = channe1.waitForConfirms();
if(flag){
System.out.println("消息发送成功"+(i+1));
}else{
System.out.println("=====第"+(i+1)+"条消息发送失败=====");
}
}
//记录结束时间
long end = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"个单独确认消息,共耗时"+(end-start)+"毫秒。");
}
}
6.6.4 批量确认发布
单个确认发布方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,无法得知是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。
/*
*批量确认
*/
public static void publishBatchconfirm() throws Exception{
//创建connection对象
Connection connection = Connectionutil.getConnection();
//创建信道对象
channel channel = connection.createchannel();
//开启发布确认
channel.confirmselect();
//批量确认消息数量
int batchsize = 100;
//未确认消息数量
int nackMessagecount = 0;
//队列名称
String queueName = UUID.randomUUID().tostring();
//创建队列对象
channel.queueDeclare(queueName,true,false,false,null);
//记录开始时间
long start = System.currentTimeMillis();
//发送消息
for (Integer i = 0; i <MESSAGE_COUNT; i++){
//消息
String message = i.tostring();
//发送消忘
channel.basicPublish("",queueName,null,message.getBytes());
//累加未确认消息个数
nackMessagecount++;
//判断未确认消息数量与批量确认消息数量是否一致
if(nackMessagecount == batchsize){
//批量确认
boolean flag = channe1.waitForConfirms();
if(flag){
System.out.println(“消息发送成功"+(i+1));
}else{
System.out.println("===第"+(i+1)+"条消息发送失败===");
}
//清空末确认消息个数
nackMessageCount = 0;
}
//为了确保剩余已经没有未确认的消息,进行再次确认
if(nackMessageCount>0){
//批量确认
channel.waitForConfirms();
}
}
//记录结束时间
long end = System.currentTimeMillis();
System.out.println("发布"+NESSAGE_COUNT+"个批量确认消息,共耗时"+(end- start)+"毫秒。");
}
6.6.5 异步确认发布
异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率方面,它是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是怎么实现的。