1,异步通讯和同步通讯:
- **异步通讯:单个服务可以和多个服务进行通讯;**
- 同步通讯:单个服务只能与一个服务进行通讯;
1.1,同步调用的问题:
- **微服务之间采用Feign的调用就是同步方式,会出现以下的问题:**
- **耦合度高:**每次添加新需求都要更改原来的代码;
- **性能低下:**调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用完成的时间;
- **资源浪费:**调用链中的每个服务在等待响应的过程中,不能释放请求占用的资源,在高并发的场景下会极度浪费资源;
- **级联失败:**如果服务提供者出现问题,那么所有的调用方都会受影响,产生多米诺骨牌效应;导致整个微服务群崩溃;
1.2,异步调用的优缺:
- **优点:;**
- 服务解耦;
- 性能提升,吞吐量提高;
- 服务没有强依赖,不用担心级联失败问题;
- **流量削峰**:一个队列在一端承接瞬时的流量洪峰(缓存),在另一端平滑地将消息推出去,按照系统的性能稳步处理访问请求;
- **缺点:**
- 依赖Broker的可靠性,安全性,吞吐能力;
- 架构复杂,业务没有明显的流程线,不好追踪管理;
- 响应会比同步通讯会有延迟;
1.3,* 异步调用的实现方案:
- **引入事件代理者,Broker:一个事件驱动器**
- **解决事件发布者和订阅者之间的耦合,两者中间引入一个中间人Broker,使得两者不是直接进行通讯,而是让事件发布者将事件分布到中间件中,让订阅者在中间件订阅事件,当有事件更新直接获取对应的事件就行了;**
- **示例:事件驱动架构:**
1.4,同步通讯与异步通讯分别的使用场景:
- **同步通讯:(大部分情况下)**
- 一个服务只需要访问一个服务且(一对一)需要**及时收到响应**的情况下就能使用同步,如登录服务,只需要进行登录验证环节就足够了,那么就能使用同步通讯;
- **在大部分业务情况下,都需要实时响应,就比如:查询列表就必须要实时获取列表数据的响应;**
- **异步通讯:**
- 在面对**高并发**,**高吞吐量**,而且单个服务要服务不同的服务(单对多)来处理不同的业务且不需要实时响应的需求,那么优先选择异步通讯;
2,* 什么是MQ:
MQ(MessageQueue)
- **MQ**:就是就是存放消息的队列,译名为:**消息队列**;也就是在事件驱动架构中的Broker;
当有消费者拉取了消息队列里面的消息,那就会阅后即焚,消息队列里面的东西就不见了;
- 而实现消息队列的技术架构被称为:消息队列中间件;
- 在市面上常见的几种**消息队列中间件**:
在国内主流的中间件有:RocketMQ,Kafka;
2.1,▲ 消息队列MQ的作用:
- 异步处理:
- 可以将一些比较耗时的操作放到其他系统中,通过消息队列将需要进行处理的操作进行存储,其他系统可以消费消息队列中的数据处理需要的业务;
- 常见的应用场景:发送短信验证码,发送邮箱验证等;
- 系统解耦:
- 原先一个微服务是通过接口(http)调用另外一个微服务,耦合严重,当如果需要修改一个微服务,那么与之相关的微服务也要随之修改,否则可能会导致系统不可用;
- 使用消息队列进行解耦,由第一个微服务将操作消息放到消息队列,另外一个微服务可以从消息队列中获取进行处理,进行系统解耦;
- 流量削峰:
- 因为消息队列是低延迟,高可靠,高吞吐的,可以应对大量海量的并发量;
日志处理:
- 可以使用消息队列作为临时存储,或者一种通信管道;
3,* RabbitMQ的使用:
3.1,RabbitMQ在Linux上的安装:
此次安装是在阿里云应用服务器上进行安装;在安装RabbitMQ组件镜像之前先需要安装好 Docker应用容器引擎;Dockerhub官网:https://hub.docker.com/_/rabbitmq
- 可以使用消息队列作为临时存储,或者一种通信管道;
//顺序解释: docker运行rabbitmq镜像,设置用户名为:root 设置密码为:root //控制台登录密码 设置容器名称:mq 设置主机名; 设置宿主机端口 15672 设置MQ的通讯端口 5672 -d 设置后台运行 设置镜像名称:镜像别名;
> **查看运行状态:# docker ps**
> ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652014252249-985fc716-28f8-4a87-9ad0-385615a41eca.png#clientId=u9d14019d-99d7-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=104&id=ud9207ae8&margin=%5Bobject%20Object%5D&name=image.png&originHeight=104&originWidth=1152&originalType=binary&ratio=1&rotation=0&showTitle=false&size=260512&status=done&style=none&taskId=u2bc6eaa7-4dff-4a25-a8d3-dd70e9a562d&title=&width=1152)
3. **访问**[http://119.23.63.60:15672/#/](http://119.23.63.60:15672/#/)**进入RabbitMQ控制台:**![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652014374942-37b04dd1-df34-4d16-8b8f-9a8f354b7f8a.png#clientId=u9d14019d-99d7-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=325&id=uadf987d4&margin=%5Bobject%20Object%5D&name=image.png&originHeight=698&originWidth=1301&originalType=binary&ratio=1&rotation=0&showTitle=false&size=61597&status=done&style=none&taskId=ued3f7d21-ef0b-4f08-8869-092a4a42da8&title=&width=605)
3. **看到上述页面则证明安装成功;**
<a name="pMGdI"></a>
## 3.2,RabbitMQ控制台界面的介绍:
<a name="YmDWh"></a>
### 3.2.1,任务栏:
![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652057394028-2cf3c63d-6f81-4381-b652-472958697bb5.png#clientId=u9278bc75-6f29-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=114&id=u8f1264ae&margin=%5Bobject%20Object%5D&name=image.png&originHeight=114&originWidth=617&originalType=binary&ratio=1&rotation=0&showTitle=false&size=8466&status=done&style=none&taskId=u1eb16732-db93-4361-9767-24ef4794560&title=&width=617)
- **Channel:**操作MQ的工具;通道;
- Connections:查看消息连接;
- 其他的略;
<a name="JBC6T"></a>
## 3.3,* RabbitMQ的结构与概念:
> **publisher:消息发送者; consumer:消费者**
![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652057150879-c7aa70f9-abb3-4eb4-85e5-5f6631ecbda2.png#clientId=u9278bc75-6f29-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=422&id=u6de1cdd3&margin=%5Bobject%20Object%5D&name=image.png&originHeight=422&originWidth=1016&originalType=binary&ratio=1&rotation=0&showTitle=false&size=257143&status=done&style=none&taskId=ub904bcd3-3551-4c4a-b01b-942b63612a6&title=&width=1016)
- **流程原理:**
**当程序有用户进行访问的时候,会创建一个具体隔离性的虚拟主机,初始化交换机和消息队列,交换机会接收消息发送者的消息,再路由到消息队列中,消息队列会对消息进行缓存,然后消费者将通过订阅的消息队列来接收对应消息队列中的消息;**
> **publisher:消息发送者;**
> **exchange:消息交换机;用于路由消息到队列**
> **queue:消息队列;**
> **consumer:消费者订阅队列:消费者从消息队列里面获取并处理消息;**
> **VirtualHost:虚拟主机,每个客户访问都会创建一个虚拟主机,具有隔离性,各个虚拟主机之间相互隔离;**
<a name="e1B3f"></a>
## **3.4,常见的消息模型:**
**又称:Basic Queue 简单队列模型**
> **官方给出的消息模型示例:一共有7种,接下来的是其中的5种,也是常用的5种;**
> **Tips:在所有模型或者任何消息队列接收消息都会由交换机进行路由转发,不然,怎么进行消息转发呢?是吧;在简单模型中,同样的也会存在一个默认的交换机进行路由;**
<a name="jZWpv"></a>
### **3.4.1,HelloWorld模型:(简单模型)**
- **该模型是基于最基础的消息队列模型来实现的,只包括三个角色:**
- **publisher:消息发布者,将消息发布到消息队列;**
- **queue:消息队列,负责接收并缓存消息,也是流量削峰的核心组件;**
- **consumer:订阅队列,处理消息队列中的消息;**
![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652058098213-3065240b-426b-4398-b5f4-accf0f337cfb.png#clientId=u9278bc75-6f29-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=95&id=u39a0779a&margin=%5Bobject%20Object%5D&name=image.png&originHeight=95&originWidth=894&originalType=binary&ratio=1&rotation=0&showTitle=false&size=45983&status=done&style=none&taskId=u36ff08ff-9000-4f4e-bf7e-e711ab48ead&title=&width=894)
<a name="kENQL"></a>
### **3.4.2,**WorkQueue 工作队列模型:
- ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652085444153-4b6d5777-6cf7-4c6a-b144-10936bb66c63.png#clientId=ua50d485d-2a35-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=251&id=RBXdi&margin=%5Bobject%20Object%5D&name=image.png&originHeight=369&originWidth=912&originalType=binary&ratio=1&rotation=0&showTitle=false&size=81837&status=done&style=none&taskId=ucf80c544-5744-4e96-940a-a5b3a09be48&title=&width=620)
<a name="Yf8SG"></a>
### 3.4.3,发布订阅模型:
---
<a name="RgnDL"></a>
## **3.5,* MQ的入门案例:(不使用AMQP实现)**
> 也是在后面的AMQP的底层原理;
1. **在父模块中pom文件导入依赖:**
```xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns = "http://maven.apache.org/POM/4.0.0"
xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>top.jztice5</groupId>
<artifactId>mq-demo</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>consumer</module>
<module>publisher</module>
</modules>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<parent>
<artifactId>spring-cloud-starter-parent</artifactId>
<groupId>org.springframework.cloud</groupId>
<version>2021.0.2</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
</project>
2. **编写publisher的连接:**
public class PublisherTest {
public static void main (String[] args) {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.132.130");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
}
3. **编写consumer的连接:**
public static void main (String[] args) {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.132.130");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
4,* SpringAMQP:
4.1,什么是SpringAMQP:
1. **AMQP:**
1. 全程为:Advanceed Message Queuing Protocol,是用于应用程序或之间传递消息的开发标准。与语言和平台没有关联;
2. **而SpringAMQP:**
1. **是基于AMQP协议的一套API规范;**
1. **包含了两个部分:**
1. **Spring-AMQP是基础抽象;**
1. **Spring-rabbit是底层的默认实现;**
利用SpringAMQP实现上述官方模型:
4.2,Basic Queue 简单队列模型案例:
- **模型结构:**
4.2.1,导入依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
4.2.2,在publisher服务中利用RabbitTemplate发送消息到Simple.queue队列:
1. **在publisher服务中编写application.yml文件,配置连接信息:**
spring:
rabbitmq:
host: 119.23.63.60
port: 5672 #是消息服务的端口
virtual-host: / #虚拟主机路径
username: root
password: root
虚拟主机名与用户对应,查看位置:
2. **在服务中创建一个测试类,编写测试方法:**
1. 注入RabbitTemplate对象来实现消息的
RabbitTemplate:Rabbit的模板工具类
convertAndSend:发送消息
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author Jztice5
* @date 2022年05月09日 9:54
*/
@RunWith (SpringRunner.class)
@SpringBootTest
public class PublisherTest {
@Autowired (required = false)
private RabbitTemplate rabbitTemplate;
@Test
public void testRabbitTemplate () {
//设置目标队列名
String queueName = "simple.queue";
//设置要发送的消息
String message = "hello,spring,ohhhhhhhhhhhhhhhhh";
rabbitTemplate.convertAndSend(queueName, message);
}
}
注意:此方法,先要有对应名字的消息队列才能存放发送的消息,程序不会帮你新建队列; 在后续的使用方法会自动帮你创建;
4.2.3,* 在consumer消费者服务中编写消费逻辑,绑定(订阅)simplle.queue队列:
1. **在consumer中编写application.yml文件,配置连接信息:**
spring:
rabbitmq:
host: 119.23.63.60
port: 5672
virtual-host: /
username: root
password: root
与发送者的配置一致
2. **在consumer中编写消费逻辑类:**
1. **记得要加上@Component注解,**让boot程序识别为bean对象;
1. 因为消息的接收与投递都由SpringBoot来进行,因此需要在程序注册为bean对象,让SpringBoot来对该bean实现自动装配;
//让boot程序识别为bean对象
@Component
public class SpringRabbitListener {
//声明目标队列名称
//绑定(订阅)消息队列的名称
@RabbitListener (queues = "simple.queue")
public void setTextConsumer(String msg){
//msg:是SpringBoot通过注解@RabbitListener识别这是一个消息队列的订阅者然后将消息投递到这里的;
System.out.println("msg = " + msg);
}
}
当有消费者拉取了消息队列里面的消息,那就会阅后即焚,消息队列里面的东西就不见了;
- 运行启动类让SpringBoot程序来处理消息的接收和投递;![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652084946222-66d24f8d-8e48-496a-a851-1edfdd8dc356.png#clientId=ua50d485d-2a35-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=60&id=u4777cac5&margin=%5Bobject%20Object%5D&name=image.png&originHeight=60&originWidth=419&originalType=binary&ratio=1&rotation=0&showTitle=false&size=6185&status=done&style=none&taskId=u7a6a77a9-772c-423a-ab95-3f38e11e539&title=&width=419)
4.3,WorkQueue 工作队列模型案例:
- **模型结构:**
- ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652085444153-4b6d5777-6cf7-4c6a-b144-10936bb66c63.png#clientId=ua50d485d-2a35-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=251&id=ud3935e87&margin=%5Bobject%20Object%5D&name=image.png&originHeight=369&originWidth=912&originalType=binary&ratio=1&rotation=0&showTitle=false&size=81837&status=done&style=none&taskId=ucf80c544-5744-4e96-940a-a5b3a09be48&title=&width=620)
- **在该模型中,在发送一条消息的时候,消费者之间会产生竞争关系,只会有一个消费者能接收到;**
- **该模型的主要作用:**
- 工作队列,可以提高消息处理速度,避免队列消息的堆积;
4.3.1,在publisher中定义测试方法,每秒产生50条消息,发送到simple.queue:
package top.jztice5.mq.helloword;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author Jztice5
* @date 2022年05月09日 9:54
*/
@RunWith (SpringRunner.class)
@SpringBootTest
public class PublisherTest {
@Autowired (required = false)
private RabbitTemplate rabbitTemplate;
@Test
public void testRabbitTemplate () throws InterruptedException {
String queueName = "simple.queue";
String message = "hello,spring,ohhhhhhhhhhhhhhhhh--";
for (int i = 1 ; i <= 50 ; i++) {
rabbitTemplate.convertAndSend(queueName, message+i);
Thread.sleep(20);
}
}
}
4.3.2,在consumer中定义两个消息监听者,都监听simple.queue队列:
- 设置消费者A每秒处理50条消息,消费者B每秒处理10条消息:
- **使用@RabbitListener注解声明队列;**
//让boot程序识别为bean对象
@Component
public class SpringRabbitListener {
//声明目标队列名称
@RabbitListener (queues = "simple.queue")
public void setTextWork1Consumer(String msg) throws InterruptedException {
System.out.println("消费者A = " + msg);
Thread.sleep(20);
}
//声明目标队列名称
@RabbitListener (queues = "simple.queue")
public void setTextWork2Consumer(String msg) throws InterruptedException {
System.err.println("消费者B = " + msg);
//让B比A慢一点;
Thread.sleep(200);
}
}
4.3.3,运行结果分析:
- **可见,即使消费者B的消费能力比消费者A要慢,但是拿到的消息的数量依然和消费者A差不多,导致运行时间缓慢;**
4.3.4,* 问题的优化:
- **这里引入了一种配置,称为消息预取限制;prefetch**
- 可以设置消费者能获取多少条消息,只有所有获取的消息都被消费者处理完成后才能获取其他消息;(避免眼阔肚窄)
1. **在消费者服务中修改application.yml文件设置 prefetch值:**
spring:
rabbitmq:
host: 119.23.63.60
port: 5672
virtual-host: /
username: root
password: root
# 设置监听者配置
listener:
simple:
prefetch: 1 # 每次一个消费者只能获取一条消息,当消息处理完后才能获取下一条消息;
4.4,* 发布订阅模型案例:(标准模型)
这个模型与上述的都不同,此模型引入了一个新组件:交换机exchange;
- 与之前模型的最大区别就是:允许将同一消息通过交换机发送给多个消费者;
该模型的重点:交换机:所有的消息都要经过交换机进行转发路由;不参与消息的存储;、
- **常见的三种交换机类型:**
1. **Fanout :广播;**
1. **Direct : 路由;**
1. **Topic:话题;**
各类型的详情请看下:使用AMQP;
4.4.1,* FanoutExchange:广播交换机
注意:在此模型中,在发送一条消息的情况下,所有消费者都会收到;还是同时的;不像无交换机模型那种竞争接收;
4.4.1.1,在consumer服务中,利用代码声明队列,交换机,并将两者绑定:
1. **在新建一个配置类FanoutConfig,在类中声明FanoutExchange,Queue并绑定关系对象Binding:**
1. **交换机,队列,绑定关系**
1. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652188872269-c20c6a22-8a7a-437a-b395-6c6c8eba49fd.png#clientId=u43fed83b-149d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=81&id=u1da6ae70&margin=%5Bobject%20Object%5D&name=image.png&originHeight=122&originWidth=268&originalType=binary&ratio=1&rotation=0&showTitle=false&size=4406&status=done&style=none&taskId=u9cddc7e8-5c93-48c0-aae1-530ae01fbc2&title=&width=178.66666666666666)
1. **记得加上@Configuration;**
package top.jztice5.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Jztice5
* @date 2022年05月10日 21:06
*/
@Configuration
public class FanoutConfig {
//声明FanoutExchange交换机bean;
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("itcast.fanout");
}
//声明Queue1的bean
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.queue1");
}
//声明Queue2的bean
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout.queue2");
}
//声明绑定关系的Bean,并绑定队列1与交换机
@Bean
public Binding bindingFanoutBindingQueue1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
//声明绑定关系的Bean,并绑定队列2与交换机
@Bean
public Binding bindingFanoutBindingQueue2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
//这里的队列和交换机都会自动创建
}
- 绑定成功效果:
- ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652189143531-addde9c0-96bf-440d-9a50-469305291b71.png#clientId=u43fed83b-149d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=35&id=u20a459d5&margin=%5Bobject%20Object%5D&name=image.png&originHeight=53&originWidth=430&originalType=binary&ratio=1&rotation=0&showTitle=false&size=4721&status=done&style=none&taskId=u4006ea16-2d0c-4877-b749-4a7a215d162&title=&width=286.6666666666667)
- ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652189155324-524d61cc-5bd4-4c6c-929d-9fa0eb9753e7.png#clientId=u43fed83b-149d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=211&id=u5f412f20&margin=%5Bobject%20Object%5D&name=image.png&originHeight=343&originWidth=550&originalType=binary&ratio=1&rotation=0&showTitle=false&size=19020&status=done&style=none&taskId=u84940884-9ac8-451e-b765-4fa18033bf3&title=&width=338.66668701171875)
4.4.1.2,在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2:
package top.jztice5.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.time.LocalTime;
/**
* @author Jztice5
* @date 2022年05月09日 16:23
*/
//让boot程序识别为bean对象
@Component
public class SpringRabbitListener {
//声明目标队列名称
@RabbitListener (queues = "fanout.queue1")
public void setTextFanoutConsumer1(String msg) throws InterruptedException {
System.out.println("消费者A = " + msg +"---"+ LocalTime.now());
Thread.sleep(20);
}
@RabbitListener (queues = "fanout.queue2")
public void setTextFanoutConsumer2(String msg) throws InterruptedException {
System.out.println("消费者B = " + msg +"---"+ LocalTime.now());
Thread.sleep(20);
}
}
4.4.1.3,在publisher中编写方法,向itcast.fanout发送消息:
/**
* 发送消息
*/
@Test
public void testFanoutExchange () {
//交换机名称
String exchangeName = "itcast.fanout";
//消息
String msg = "ohhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh";
//发送
rabbitTemplate.convertAndSend(exchangeName,"",msg);
}
4.4.2,* DirectExchange:路由交换机
- 这个交换机会将接收到的消息根据规则路由到指定的queue,也被称为**路由模式(routes);**
- 每个Queue都与Exchange设置一个BindingKey;
- 当发送者发送消息时,指定消息的Queue;
- Exchange就会将消息路由到BindingKey与消息RoutingKey一致的队列;
![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652190556513-c07dfc02-0ace-4a80-a5b8-daf9aa30b902.png#clientId=u43fed83b-149d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=318&id=ud800f93b&margin=%5Bobject%20Object%5D&name=image.png&originHeight=556&originWidth=1813&originalType=binary&ratio=1&rotation=0&showTitle=false&size=229356&status=done&style=none&taskId=u58ad48ba-4de3-4861-b454-f28cbbcebec&title=&width=1035.666748046875)
注意:DirectExchange交换机是支持不同的队列queue设置相同的bindingKey;这种情况,就会将消息转发到所有与key匹配的队列; 就能实现类似于广播交换机的效果,但是,要注意key的设置;
步骤和广播交换机不同!
4.4.2.1,在监听者类方法中利用@RabbitListener声明Exchange,Queue,RoutingKey:
- 在之前的FanoutExchange中是使用Bean来声明交换机与队列绑定关系;
- **在这里我们使用一种新的也是常用的方式来进行声明:**
- **使用@RabbitListener注解的相关属性在监听者方法上进行声明即可:**
- **编写监听者方法分别监听:**direct.queue1和direct.queue2:
package top.jztice5.mq.listener;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author Jztice5
* @date 2022年05月09日 16:23
*/
//让boot程序识别为bean对象
@Component
public class SpringRabbitListener {
//声明queue1的绑定关系和key
@RabbitListener (bindings = @QueueBinding (
//声明队列名称
value = @Queue (name = "direct.queue1"),
//声明交换机名称,以及类型(默认类型是direct)
exchange = @Exchange (name = "itcast.direct" , type = ExchangeTypes.DIRECT),
//声明队列key值(RoutingKey)
key = {"red" , "bule"}
))
public void listenDirectQueue1 (String msg) {
System.out.println("消费者1收到了消息" + msg);
}
//声明queue2的绑定关系和key
@RabbitListener (bindings = @QueueBinding (
//声明队列名称
value = @Queue (name = "direct.queue2"),
//声明交换机名称,以及类型(默认类型是direct)
exchange = @Exchange (name = "itcast.direct" , type = ExchangeTypes.DIRECT),
//声明队列key值(RoutingKey)
key = {"red" , "bule"}
))
public void listenDirectQueue2(String msg) {
System.out.println("消费者2收到了消息" + msg);
}
}
//声明交换机名称,以及类型(默认类型是direct), type = ExchangeTypes.DIRECT ; 在AMQP的api中,交换机类型默认为:Direct;
- **启动成功效果:**
- ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652192894642-7bc7bbe7-8b2c-4933-9110-7af022b02b48.png#clientId=u43fed83b-149d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=25&id=u3c718122&margin=%5Bobject%20Object%5D&name=image.png&originHeight=38&originWidth=440&originalType=binary&ratio=1&rotation=0&showTitle=false&size=4379&status=done&style=none&taskId=udf9f7945-4492-4693-8447-f8b08edf878&title=&width=293.3333333333333)
- ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652193411770-853de3a9-62c3-4d1a-b7b0-4ee05b3549ea.png#clientId=u43fed83b-149d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=251&id=u2dc1d5a7&margin=%5Bobject%20Object%5D&name=image.png&originHeight=480&originWidth=539&originalType=binary&ratio=1&rotation=0&showTitle=false&size=32741&status=done&style=none&taskId=ud6e98fd6-0564-4158-ae8c-ef6625062a3&title=&width=281.3333435058594)
4.4.2.2,在publisher中编写方法,向itcast.direct发送消息:
- 和之前的类似,只需要在 rabbitTemplate.convertAndSend方法中指定RoutingKey即可;
/**
* 发送消息
*/
@Test
public void testDirectExchange () {
//交换机名称
String exchangeName = "itcast.direct";
//消息
String msg = "ohhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh";
//还要设置routingKey
rabbitTemplate.convertAndSend(exchangeName,"bule",msg);
}
注意:路由器名要对应;不然任意找不到bug;
- **Direct交换机和Fanout交换机的区别:**
- ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652193771367-5d5b8853-82e6-4237-9fb3-dbb17299349f.png#clientId=u43fed83b-149d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=135&id=u754067e5&margin=%5Bobject%20Object%5D&name=image.png&originHeight=202&originWidth=946&originalType=binary&ratio=1&rotation=0&showTitle=false&size=107918&status=done&style=none&taskId=u2c2238dc-fa6d-4694-ba82-0622f0c8a10&title=&width=630.6666666666666)
4.4.3,▲ TopicExchange:话题交换机 (常用)
- TopicExchange与DirectExchange类似,但主要的**区别**是RoutingKey必须是多个单词的列表,并且要用 **. **号进行分割**;**
- **在Topic中,Queue与Exchange指定BindingKey可以使用通配符:**
- **# :代指0个或多个单词;**
- *** : 代指一个单词;**
- **案例结构:**
- ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652194434182-3234d8a0-7824-41b0-8731-0cf465cdadb5.png#clientId=u43fed83b-149d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=231&id=u5d4ea455&margin=%5Bobject%20Object%5D&name=image.png&originHeight=347&originWidth=1411&originalType=binary&ratio=1&rotation=0&showTitle=false&size=168268&status=done&style=none&taskId=uc8c98c22-c8e2-440c-bb0c-c895760ea13&title=&width=940.6666666666666)
4.4.3.1,* 在监听者类方法中利用@RabbitListener声明Exchange,Queue,RoutingKey:
- **使用@RabbitListener注解的相关属性在监听者方法上进行声明即可:**
- **编写监听者方法分别监听:**topic.queue1 和 topic.queue2:
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author Jztice5
* @date 2022年05月09日 16:23
*/
//让boot程序识别为bean对象
@Component
public class SpringRabbitListener {
//声明queue1的绑定关系和key
@RabbitListener (bindings = @QueueBinding (
//声明队列名称
value = @Queue (name = "topic.queue1"),
//声明交换机名称,以及类型(默认类型是direct)
exchange = @Exchange (name = "itcast.topic" , type = ExchangeTypes.TOPIC),
//声明队列key值(RoutingKey)
key = "china.#"
))
public void listenTopicQueue1 (String msg) {
System.out.println("消费者1收到了消息" + msg);
}
//声明queue2的绑定关系和key
@RabbitListener (bindings = @QueueBinding (
//声明队列名称
value = @Queue (name = "topic.queue2"),
//声明交换机名称,以及类型(默认类型是direct)
exchange = @Exchange (name = "itcast.topic" , type = ExchangeTypes.TOPIC),
//声明队列key值(RoutingKey)
key = "#.news"
))
public void listenTopicQueue2(String msg) {
System.out.println("消费者2收到了消息" + msg);
}
}
注意:要设置交换机的类型为:TOPIC;
- **启动成功效果:**
- ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652194824310-ba1f11df-84e2-4ffa-8520-9e7722044794.png#clientId=u43fed83b-149d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=31&id=ufbd89bf6&margin=%5Bobject%20Object%5D&name=image.png&originHeight=47&originWidth=430&originalType=binary&ratio=1&rotation=0&showTitle=false&size=4534&status=done&style=none&taskId=u86ad7078-35dd-4194-8aff-7be720842e0&title=&width=286.6666666666667)
- ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652194837744-d74deb82-96f3-4cd2-a624-787aa0830fee.png#clientId=u43fed83b-149d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=202&id=ub5453842&margin=%5Bobject%20Object%5D&name=image.png&originHeight=345&originWidth=539&originalType=binary&ratio=1&rotation=0&showTitle=false&size=22523&status=done&style=none&taskId=u49c72a44-21b5-4cbb-a37b-b0502f9c70f&title=&width=316.3333435058594)
4.4.3.2,* 在publisher中编写方法,向itcast.topic发送消息:
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author Jztice5
* @date 2022年05月09日 9:54
*/
@RunWith (SpringRunner.class)
@SpringBootTest
public class PublisherTest {
//注入RabbitTemplate
@Autowired (required = false)
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
*/
@Test
public void testTopicExchange () {
//交换机名称
String exchangeName = "itcast.topic";
//消息
String msg = "ohhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh";
//还要设置routingKey
rabbitTemplate.convertAndSend(exchangeName,"china.news",msg);
}
}
- **消息接收的情况举例:**
- ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652195410210-3097c008-fac6-4d1b-aa13-108a8c8cd976.png#clientId=u43fed83b-149d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=118&id=u76b219b2&margin=%5Bobject%20Object%5D&name=image.png&originHeight=177&originWidth=509&originalType=binary&ratio=1&rotation=0&showTitle=false&size=16068&status=done&style=none&taskId=u5dff2ca3-2df7-483d-83b8-6c792810635&title=&width=339.3333333333333)
发送的消息 | 目标队列 |
---|---|
china.news | 两个都接收到 |
dada.news | 队列2收到 |
china.dada | 队列1收到 |
4.5,AMQP的注解:
注解 | 作用 |
---|---|
@RabbitListener | 声明(设置)绑定关系,交换机名称和类型,消息队列名称,RoutingKey值(队列) |