⼀、消息队列介绍
1.1 同步调用与异步调用
同步调用:
Feign客户端可以实现服务间的通信,但是Feign是同步调用,也就是说A服务调用B服务之后,会进⼊阻塞/等待状态,直到B服务返回调用结果给A服务, A服务才会继续往后执行在特定的业务场景中:用户注册成功之后,发送短息通知用户(A服务为用户注册, B服务发送短信) A服务在完成用户注册之后(代码1),调用B服务发送短信, A服务完成B服务调用之后无需等待B服务的执行接口,直接执行提示用户注册从公(代码2),在这种需求下A服务调用B服务如果使用同步调用,必然降低A服务的执行效率,因此在这种场景下A服务需要通过异步调用 调用B服务
异步调用:
当A服务调用B服务之后,无需等待B的调用结果,可以继续往下执行;那么服务间的异步通信该如何实现呢?
服务之间可以通过消息队列实现异步调用
- 同步调用
- A服务调用B服务,需要等待B服务执行完毕的返回值, A服务才可以继续往下执行
- 同步调用可以通过REST和RPC完成
- REST: ribbon、 Feign
- RPC: Dubbo
- 异步调用
- A服务调用B服务,而无需等待B服务的执行结果,也就是说在B服务执行的同时A服务可以继续往下执行
- 通过消息队列实现异步调用
1.2 消息队列概念
- MQ全称为Message Queue,消息队列(MQ)是⼀种应⽤程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。
- 消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。
1.3 常用的消息队列产品
- RabbitMQ 稳定可靠,数据⼀致,支持多协议,有消息确认,基于erlang语言
- Kafka 高吞吐,高性能,快速持久化,无消息确认,无消息遗漏,可能会有有重复消息,依赖于zookeeper,成本高
- ActiveMQ 不够灵活轻巧,对队列较多情况⽀持不好.
- RocketMQ 性能好,⾼吞吐,⾼可⽤性,⽀持⼤规模分布式,协议⽀持单⼀
⼆、 RabbitMQ
2.1 RabbitMQ介绍
- RabbitMQ是⼀个在AMQP基础上完成的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
- AMQP,即Advanced Message Queuing Protocol, ⼀个提供统⼀消息服务的应⽤层标准⾼级消息队列协议,是应⽤层协议的⼀个开放标准,为⾯向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语⾔等条件的限制。 Erlang中的实现有 RabbitMQ等。
- 主要特性:
**保证可靠性**
:使⽤⼀些机制来保证可靠性,如持久化、传输确认、发布确认- 灵活的路由功能
- 可伸缩性:⽀持消息集群,多台RabbitMQ服务器可以组成⼀个集群
**⾼可⽤性**
: RabbitMQ集群中的某个节点出现问题时队列仍然可⽤- ⽀持多种协议
- ⽀持多语⾔客户端
**提供良好的管理界⾯**
- 提供跟踪机制:如果消息出现异常,可以通过跟踪机制分析异常原因
- 提供插件机制:可通过插件进⾏多⽅⾯扩展
2.2 RabbitMQ安装和配置
参考安装⽂档: RabbitMQ安装及配置.pdf
2.3 RabbitMQ逻辑结构
RabbitMQ逻辑结构 |
---|
三、 RabbitMQ用户管理
RabbitMQ默认提供了⼀个guests账号,但是此账号不能作远程登录,也就是不能在管理系统的登录;我们可以创建⼀个新的账号并授予响应的管理权限来实现远程登录
3.1 逻辑结构
- 用户
- 虚拟主机
- 队列
3.2 用户管理
3.2.1 命令行用户管理
新增⽤户
./rabbitmqctl add_user ytao admin123
- 设置用户级别
```shell
## ⽤户级别:
## 1.administrator 可以登录控制台、查看所有信息、可以对RabbitMQ进⾏管理
## 2.monitoring 监控者 登录控制台、查看所有信息
## 3.policymaker 策略制定者 登录控制台、指定策略
## 4.managment 普通管理员 登录控制台
./rabbitmqctl set_user_tags ytao administrator
3.2.2 管理系统进行用户管理
管理系统登录:访问 http://47.96.11.185:15672/
1.新增用户 |
---|
2.创建虚拟主机 |
---|
3.删除用户 |
---|
4.用户绑定虚拟主机 |
---|
四、 RabbitMQ工作方式
RabbitMQ提供了多种消息的通信方式—工作模式
消息通信是由两个角色完成:消息生产者(producer)和 消息消费者(Consumer)
4.1 简单模式
⼀个队列只有⼀个消费者
4.2 工作模式
多个消费者监听同⼀个队列
4.3 订阅模式
⼀个交换机绑定多个消息队列,每个消息队列有⼀个消费者监听
4.4 路由模式
⼀个交换机绑定多个消息队列,每个消息队列都由自己唯⼀的key,每个消息队列有⼀个消费者监听
路由模式 |
---|
五、 RabbitMQ交换机和队列管理
5.1 创建队列
5.2 创建交换机
5.3 交换机绑定队列
六、在普通的Maven应用中使用MQ
RabbitMQ队列结构 |
---|
6.1简单模式
6.1.1 消息生产者
- 创建Maven项目
添加RabbitMQ连接所需要的依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.10.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
在resources目录下创建log4j.properties
log4j.rootLogger=DEBUG,A1 log4j.logger.com.taotao = DEBUG
log4j.logger.org.mybatis = DEBUG
log4j.appender.A1=org.apache.log4j.ConsoleAppender
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-[%p] %m%n
创建MQ连接帮助类
```java package com.qfedu.mq.utils;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class ConnectionUtil { public static Connection getConnection() throws IOException, TimeoutException { //1.创建连接⼯⼚ ConnectionFactory factory = new ConnectionFactory(); //2.在⼯⼚对象中设置MQ的连接信息(ip,port,virtualhost,username,password) factory.setHost(“47.96.11.185”); factory.setPort(5672); factory.setVirtualHost(“host1”); factory.setUsername(“ytao”); factory.setPassword(“admin123”); //3.通过⼯⼚对象获取与MQ的链接 Connection connection = factory.newConnection(); return connection; } }
- 消息生产者发送消息
```java
package com.qfedu.mq.service;
import com.qfedu.mq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class SendMsg {
public static void main(String[] args) throws Exception{
String msg = "Hello HuangDaoJun!";
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//定义队列(使⽤Java代码在MQ中新建⼀个队列)
//参数1:定义的队列名称
//参数2:队列中的数据是否持久化(如果选择了持久化)
//参数3: 是否排外(当前队列是否为当前连接私有)
//参数4:⾃动删除(当此队列的连接数为0时,此队列会销毁(⽆论队列中是否还有数据))
//参数5:设置当前队列的参数
//channel.queueDeclare("queue7",false,false,false,null);
//参数1:交换机名称,如果直接发送信息到队列,则交换机名称为""
//参数2:⽬标队列名称
//参数3:设置当前这条消息的属性(设置过期时间 10)
//参数4:消息 的内容
channel.basicPublish("","queue1",null,msg.getBytes());
System.out.println("发送: " + msg);
channel.close();
connection.close();
}
}
6.1.2 消息消费者
- 创建Maven项目
- 添加依赖
- log4j.properties
- ConnetionUtil.java
- 消费者消费消息
```java package com.qfedu.mq.service;
import com.qfedu.mq.utils.ConnectionUtil; import com.rabbitmq.client.*;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class ReceiveMsg { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println(“接收: “+msg); } }; channel.basicConsume(“queue1”,true,consumer); } }
<a name="63df28ed"></a>
### 6.2 工作模式
> ⼀个发送者多个消费者
<a name="d81a7194"></a>
#### 6.2.1 发送者
```java
public class SendMsg {
public static void main(String[] args) throws Exception{
System.out.println("请输⼊消息: ");
Scanner scanner = new Scanner(System.in);
String msg = null;
while(!"quit".equals(msg = scanner.nextLine())){
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.basicPublish("","queue2",null,msg.getBytes());
System.out.println("发送: " + msg);
channel.close();
connection.close();
}
}
}
6.2.2 消费者1
public class ReceiveMsg {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//body就是从队列中获取的数据
String msg = new String(body);
System.out.println("Consumer1接收: "+msg);
if("wait".equals(msg)){
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
channel.basicConsume("queue2",true,consumer);
}
}
6.2.3 消费者2
public class ReceiveMsg {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {
//body就是从队列中获取的数据
String msg = new String(body);
System.out.println("Consumer2接收: "+msg);
}
};
channel.basicConsume("queue2",true,consumer);
}
}
6.3 订阅模式
6.3.1 发送者 发送消息到交换机
public class SendMsg {
public static void main(String[] args) throws Exception{
System.out.println("请输⼊消息: ");
Scanner scanner = new Scanner(System.in);
String msg = null;
while(!"quit".equals(msg = scanner.nextLine())){
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.basicPublish("ex1","",null,msg.getBytes());
System.out.println("发送: " + msg);
channel.close();
connection.close();
}
}
}
6.3.2 消费者1
public class ReceiveMsg1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {
//body就是从队列中获取的数据
String msg = new String(body);
System.out.println("Consumer1接收: "+msg);
if("wait".equals(msg)){
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
channel.basicConsume("queue3",true,consumer);
}
}
6.3.3 消费者2
public class ReceiveMsg2 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//body就是从队列中获取的数据
String msg = new String(body);
System.out.println("Consumer2接收: "+msg);
}
};
channel.basicConsume("queue4",true,consumer);
}
}
6.4 路由模式
6.4.1 发送者 发送消息到交换机
public class SendMsg {
public static void main(String[] args) throws Exception{
System.out.println("请输⼊消息: ");
Scanner scanner = new Scanner(System.in);
String msg = null;
while(!"quit".equals(msg = scanner.nextLine())){
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
if(msg.startsWith("a")){
channel.basicPublish("ex2","a",null,msg.getBytes());
}else if(msg.startsWith("b")){
channel.basicPublish("ex2","b",null,msg.getBytes());
}
System.out.println("发送: " + msg);
channel.close();
connection.close();
}
}
}
6.4.2 消费者1
public class ReceiveMsg1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//body就是从队列中获取的数据
String msg = new String(body);
System.out.println("Consumer1接收: "+msg);
if("wait".equals(msg)){
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
channel.basicConsume("queue5",true,consumer);
}
}
6.4.3 消费者2
public class ReceiveMsg2 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//body就是从队列中获取的数据
String msg = new String(body);
System.out.println("Consumer2接收: "+msg);
}
};
channel.basicConsume("queue6",true,consumer);
}
}
七、在SpringBoot应用中使用MQ
SpringBoot应用可以完成自动配置及依赖注⼊——可以通过Spring直接提供与MQ的连接对象
7.1 消息生产者
- 创建SpringBoot应用,添加依赖
配置application.yml
server:
port: 9001
spring:
application:
name: producer
rabbitmq:
host: 47.96.11.185
port: 5672
virtual-host: host1
username: ytao
password: admin123
发送消息
@Service
public class TestService {
@Resource
private AmqpTemplate amqpTemplate;
public void sendMsg(String msg){
//1. 发送消息到队列
amqpTemplate.convertAndSend("queue1",msg);
//2. 发送消息到交换机(订阅交换机)
amqpTemplate.convertAndSend("ex1","",msg);
//3. 发送消息到交换机(路由交换机)
amqpTemplate.convertAndSend("ex2","a",msg);
}
}
7.2 消息消费者
- 创建项目添加依赖
- 配置yml
- 接收消息
@Service
//@RabbitListener(queues = {"queue1","queue2"})
@RabbitListener(queues = "queue1")
public class ReceiveMsgService {
@RabbitHandler
public void receiveMsg(String msg){
System.out.println("接收MSG: "+msg);
}
}
八、使用RabbitMQ传递对象
RabbitMQ是消息队列,发送和接收的都是字符串/字节数组类型的消息
8.1 使用序列化对象
要求:
- 传递的对象实现序列化接⼝
- 传递的对象的包名、类名、属性名必须⼀致
消息提供者
@Service
public class MQService {
@Resource
private AmqpTemplate amqpTemplate;
public void sendGoodsToMq(Goods goods){
//消息队列可以发送 字符串、字节数组、序列化对象
amqpTemplate.convertAndSend("","queue1",goods);
}
}
消息消费者
@Component
@RabbitListener(queues = "queue1")
public class ReceiveService {
@RabbitHandler
public void receiveMsg(Goods goods){
System.out.println("Goods---"+goods);
}
}
8.2 使用序列化字节数组
要求:
- 传递的对象实现序列化接口
- 传递的对象的包名、类名、属性名必须⼀致
消息提供者
@Service
public class MQService {
@Resource
private AmqpTemplate amqpTemplate;
public void sendGoodsToMq(Goods goods){
//消息队列可以发送 字符串、字节数组、序列化对象
byte[] bytes = SerializationUtils.serialize(goods);
amqpTemplate.convertAndSend("","queue1",bytes);
}
}
消息消费者
@Component
@RabbitListener(queues = "queue1")
public class ReceiveService {
@RabbitHandler
public void receiveMsg(byte[] bs){
Goods goods = (Goods) SerializationUtils.deserialize(bs);
System.out.println("byte[]---"+goods);
}
}
8.3 使用JSON字符串传递
要求:对象的属性名⼀直
消息提供者
@Service
public class MQService {
@Resource
private AmqpTemplate amqpTemplate;
public void sendGoodsToMq(Goods goods) throws JsonProcessingException {
//消息队列可以发送 字符串、字节数组、序列化对象
ObjectMapper objectMapper = new ObjectMapper();
String msg = objectMapper.writeValueAsString(goods);
amqpTemplate.convertAndSend("","queue1",msg);
}
}
消息消费者
@Component
@RabbitListener(queues = "queue1")
public class ReceiveService {
@RabbitHandler
public void receiveMsg(String msg) throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
Goods goods = objectMapper.readValue(msg,Goods.class);
System.out.println("String---"+msg);
}
}
九、基于Java的交换机与队列创建
我们使用消息队列,消息队列和交换机可以通过管理系统完成创建,也可以在应用程序中通过Java代码来完成创建
9.1 普通Maven项目交换机及队列创建
使用Java代码新建队列
//1.定义队列 (使⽤Java代码在MQ中新建⼀个队列)
//参数1:定义的队列名称
//参数2:队列中的数据是否持久化(如果选择了持久化)
//参数3: 是否排外(当前队列是否为当前连接私有)
//参数4:⾃动删除(当此队列的连接数为0时,此队列会销毁(⽆论队列中是否还有数据))
//参数5:设置当前队列的参数
channel.queueDeclare("queue7",false,false,false,null);
新建交换机
//定义⼀个“订阅交换机”
channel.exchangeDeclare("ex3", BuiltinExchangeType.FANOUT);
//定义⼀个“路由交换机”
channel.exchangeDeclare("ex4", BuiltinExchangeType.DIRECT);
绑定队列到交换机
//绑定队列
//参数1:队列名称
//参数2:⽬标交换机
//参数3:如果绑定订阅交换机参数为"",如果绑定路由交换机则表示设置队列的key
channel.queueBind("queue7","ex4","k1");
channel.queueBind("queue8","ex4","k2");
9.2 SpringBoot应用中通过配置完成队列的创建
@Configuration
public class RabbitMQConfiguration {
//声明队列
@Bean
public Queue queue9(){
Queue queue9 = new Queue("queue9");
//设置队列属性
return queue9;
}
@Bean
public Queue queue10(){
Queue queue10 = new Queue("queue10");
//设置队列属性
return queue10;
}
//声明订阅模式交换机
@Bean
public FanoutExchange ex5(){
return new FanoutExchange("ex5");
}
//声明路由模式交换机
@Bean
public DirectExchange ex6(){
return new DirectExchange("ex6");
}
//绑定队列
@Bean
public Binding bindingQueue9(Queue queue9, DirectExchange ex6){
return BindingBuilder.bind(queue9).to(ex6).with("k1");
}
@Bean
public Binding bindingQueue10(Queue queue10, DirectExchange ex6){
return BindingBuilder.bind(queue10).to(ex6).with("k2");
}
}
十、消息的可靠性
消息的可靠性:从
生产者发送消息
——消息队列存储消息
——消费者消费消息
的整个过程中消息的安全性及可
控性。
- 生产者
- 消息队列
- 消费者
10.1 RabbitMQ事务
RabbitMQ事务指的是基于客户端实现的事务管理,当在消息发送过程中添加了事务,处理效率降低⼏⼗倍甚
⾄上百倍
Connection connection = RabbitMQUtil.getConnection(); //connection 表示与 host1的连接
Channel channel = connection.createChannel();
channel.txSelect(); //开启事务
try{
channel.basicPublish("ex4", "k1", null, msg.getBytes());
channel.txCommit(); //提交事务
}catch (Exception e){
channel.txRollback(); //事务回滚
}finally{
channel.close();
connection.close();
}
10.2 RabbitMQ消息确认和return机制
消息确认机制:确认消息提供者是否成功发送消息到交换机
return机制:确认消息是否成功的从交换机分发到队列
10.2.1 普通Maven项目的消息确认
- 普通confirm方式
```java //1.发送消息之前开启消息确认 channel.confirmSelect();
channel.basicPublish(“ex1”, “a”, null, msg.getBytes());
//2.接收消息确认 boolean b = channel.waitForConfirms();
System.out.println(“发送: “ +(b?”成功”:”失败”));
- 批量confirm方式
```java
//1.发送消息之前开启消息确认
channel.confirmSelect();
//2.批量发送消息
for (int i=0 ; i<10 ; i++){
channel.basicPublish("ex1", "a", null, msg.getBytes());
}
//3.接收批量消息确认:发送的所有消息中,如果有⼀条是失败的,则所有消息发送直接失败,抛出IO异常
boolean b = channel.waitForConfirms();
- 异步confirm方式
```java //发送消息之前开启消息确认 channel.confirmSelect();
//批量发送消息 for (int i=0 ; i<10 ; i++){ channel.basicPublish(“ex1”, “a”, null, msg.getBytes()); } //假如发送消息需要10s, waitForConfirms会进⼊阻塞状态 //boolean b = channel.waitForConfirms();
//使⽤监听器异步confirm
channel.addConfirmListener(new ConfirmListener() {
//参数1: long l 返回消息的表示
//参数2: boolean b 是否为批量confirm
public void handleAck(long l, boolean b) throws IOException {
System.out.println(“~消息成功发送到交换机”);
}
public void handleNack(long l, boolean b) throws IOException {
System.out.println(“~消息发送到交换机失败”);
}
});
<a name="cd79f722"></a>
#### 10.2.2 普通Maven项目的return机制
- 添加return监听器
- 发送消息是指定第三个参数为true
- 由于监听器监听是异步处理,所以在消息发送之后不能关闭channel
```java
String msg = "Hello HuangDaoJun!";
Connection connection = ConnectionUtil.getConnection(); //相当于JDBC操作的数据库连接
Channel channel = connection.createChannel(); //相当于JDBC操作的statement
//return机制:监控交换机是否将消息分发到队列
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int i, String s, String s1, String s2,AMQP.BasicProperties basicProperties,byte[] bytes) throws IOException {
//如果交换机分发消息到队列失败,则会执⾏此⽅法(⽤来处理交换机分发消息到队列失败的情况)
System.out.println("*****"+i); //标识
System.out.println("*****"+s); //
System.out.println("*****"+s1); //交换机名
System.out.println("*****"+s2); //交换机对应的队列的key
System.out.println("*****"+new String(bytes)); //发送的消息
}
});
//发送消息
//channel.basicPublish("ex2", "c", null, msg.getBytes());
channel.basicPublish("ex2", "c", true, null, msg.getBytes());
10.3 在SpringBoot应用实现消息确认与return监听
10.3.1 配置application.yml,开启消息确认和return监听
spring:
rabbitmq:
publisher-confirm-type: simple ## 开启消息确认模式
publisher-returns: true ##使⽤return监听机制
10.3.2 创建confirm和return监听
消息确认
@Component
public class MyConfirmListener implements RabbitTemplate.ConfirmCallback {
@Autowired
private AmqpTemplate amqpTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
//参数b 表示消息确认结果
//参数s 表示发送的消息
if(b){
System.out.println("消息发送到交换机成功! ");
}else{
System.out.println("消息发送到交换机失败! ");
amqpTemplate.convertAndSend("ex4","",s);
}
}
}
return机制
@Component
public class MyReturnListener implements RabbitTemplate.ReturnsCallback {
@Autowired
private AmqpTemplate amqpTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息从交换机分发到队列失败");
String exchange = returnedMessage.getExchange();
String routingKey = returnedMessage.getRoutingKey();
String msg = returnedMessage.getMessage().toString();
amqpTemplate.convertAndSend(exchange,routingKey,msg);
}
}
10.4 RabbitMQ消费者手动应答
@Component
@RabbitListener(queues="queue01")
public class Consumer1 {
@RabbitHandler
public void process(String msg,Channel channel, Message message) throws IOException {
try {
System.out.println("get msg1 success msg = "+msg);
/**
* 确认⼀条消息: <br>
* channel.basicAck(deliveryTag, false); <br>
* deliveryTag:该消息的index <br>
* multiple:是否批量.true:将⼀次性ack所有⼩于deliveryTag的消息 <br>
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
//消费者处理出了问题,需要告诉队列信息消费失败
/**
* 拒绝确认消息:<br>
* channel.basicNack(long deliveryTag, boolean multiple, boolean requeue) ; <br>
* deliveryTag:该消息的index<br>
* multiple:是否批量.true:将⼀次性拒绝所有⼩于deliveryTag的消息。 <br>
* requeue:被拒绝的是否重新⼊队列 <br>
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
System.err.println("get msg1 failed msg = "+msg);
}
}
}
10.5 消息消费的幂等性问题
消息消费的幂等性——多次消费的执⾏结果时相同的 (避免重复消费)
解决⽅案:处理成功的消息setnx到redis
十一、延迟机制
11.1 延迟队列
- 延迟队列——消息进⼊到队列之后,延迟指定的时间才能被消费者消费
- AMQP协议和RabbitMQ队列本身是不⽀持延迟队列功能的,但是可以通过TTL(Time To Live)特性模拟延迟队列的功能
- TTL就是消息的存活时间。 RabbitMQ可以分别对队列和消息设置存活时间
- 在创建队列的时候可以设置队列的存活时间,当消息进⼊到队列并且在存活时间内没有消费者消费,则此消息就会从当前队列被移除;
- 创建消息队列没有设置TTL,但是消息设置了TTL,那么当消息的存活时间结束,也会被移除;
- 当TTL结束之后,我们可以指定将当前队列的消息转存到其他指定的队列
11.2 使用延迟队列实现订单支付监控
11.2.1 实现流程图
11.2.2 创建交换机和队列
1.创建路由交换机 |
---|
2.创建消息队列 |
---|
3.创建死信队列 |
---|
4.队列绑定 |
---|
⼗⼆、消息队列作用/使用场景总结
12.1 解耦
场景说明:用户下单之后,订单系统要通知库存系统
传统方式:订单系统直接调⽤库存系统提供的接口,如果库存系统出现故障会导致订单系统失败 |
---|
使用消息队列: |
---|
12.2 异步
场景说明:用户注册成功之后,需要发送注册邮件及注册短信提醒
传统方式: |
---|
使用消息队列: |
---|
12.3 消息通信
场景说明:应用系统之间的通信,例如聊天室
聊天室 |
---|
12.4 流量削峰
场景说明:秒杀业务
大量的请求不会主动请求秒杀业务,而是存放在消息队列(缓存) |
---|
12.5 日志处理
场景说明:系统中大量的日志处理
日志搜集处理 |
---|