1,异步通讯和同步通讯:

  1. - **异步通讯:单个服务可以和多个服务进行通讯;**
  2. - 同步通讯:单个服务只能与一个服务进行通讯;

1.1,同步调用的问题:

  1. - **微服务之间采用Feign的调用就是同步方式,会出现以下的问题:**
  2. - **耦合度高:**每次添加新需求都要更改原来的代码;
  3. - **性能低下:**调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用完成的时间;
  4. - **资源浪费:**调用链中的每个服务在等待响应的过程中,不能释放请求占用的资源,在高并发的场景下会极度浪费资源;
  5. - **级联失败:**如果服务提供者出现问题,那么所有的调用方都会受影响,产生多米诺骨牌效应;导致整个微服务群崩溃;

1.2,异步调用的优缺:

  1. - **优点:;**
  2. - 服务解耦;
  3. - 性能提升,吞吐量提高;
  4. - 服务没有强依赖,不用担心级联失败问题;
  5. - **流量削峰**:一个队列在一端承接瞬时的流量洪峰(缓存),在另一端平滑地将消息推出去,按照系统的性能稳步处理访问请求;
  6. - **缺点:**
  7. - 依赖Broker的可靠性,安全性,吞吐能力;
  8. - 架构复杂,业务没有明显的流程线,不好追踪管理;
  9. - 响应会比同步通讯会有延迟;

1.3,* 异步调用的实现方案:

  1. - **引入事件代理者,Broker:一个事件驱动器**
  2. - **解决事件发布者和订阅者之间的耦合,两者中间引入一个中间人Broker,使得两者不是直接进行通讯,而是让事件发布者将事件分布到中间件中,让订阅者在中间件订阅事件,当有事件更新直接获取对应的事件就行了;**
  3. - **示例:事件驱动架构:**

image.png

1.4,同步通讯与异步通讯分别的使用场景:

  - **同步通讯:(大部分情况下)**
     - 一个服务只需要访问一个服务且(一对一)需要**及时收到响应**的情况下就能使用同步,如登录服务,只需要进行登录验证环节就足够了,那么就能使用同步通讯;
     - **在大部分业务情况下,都需要实时响应,就比如:查询列表就必须要实时获取列表数据的响应;**
  - **异步通讯:**
     - 在面对**高并发**,**高吞吐量**,而且单个服务要服务不同的服务(单对多)来处理不同的业务且不需要实时响应的需求,那么优先选择异步通讯;

2,* 什么是MQ:

MQ(MessageQueue)

  - **MQ**:就是就是存放消息的队列,译名为:**消息队列**;也就是在事件驱动架构中的Broker;

当有消费者拉取了消息队列里面的消息,那就会阅后即焚,消息队列里面的东西就不见了;

     - 而实现消息队列的技术架构被称为:消息队列中间件;
  - 在市面上常见的几种**消息队列中间件**:

image.png

在国内主流的中间件有:RocketMQ,Kafka;

2.1,▲ 消息队列MQ的作用:

  • 异步处理:
    • 可以将一些比较耗时的操作放到其他系统中,通过消息队列将需要进行处理的操作进行存储,其他系统可以消费消息队列中的数据处理需要的业务;
    • 常见的应用场景:发送短信验证码,发送邮箱验证等;
  • 系统解耦:
    • 原先一个微服务是通过接口(http)调用另外一个微服务,耦合严重,当如果需要修改一个微服务,那么与之相关的微服务也要随之修改,否则可能会导致系统不可用;
    • 使用消息队列进行解耦,由第一个微服务将操作消息放到消息队列,另外一个微服务可以从消息队列中获取进行处理,进行系统解耦;
  • 流量削峰:
    • 因为消息队列是低延迟,高可靠,高吞吐的,可以应对大量海量的并发量;
  • 日志处理:

    • 可以使用消息队列作为临时存储,或者一种通信管道;

      3,* RabbitMQ的使用:

      3.1,RabbitMQ在Linux上的安装:

      此次安装是在阿里云应用服务器上进行安装;在安装RabbitMQ组件镜像之前先需要安装好 Docker应用容器引擎;Dockerhub官网:https://hub.docker.com/_/rabbitmq

    1. 在Dockerhub官网中拉取RabbitMQ镜像:image.png

      docker pull rabbitmq
      
    2. 执行一下命令运行MQ容器完成对MQ的安装与配置: ```java

      docker run -e RABBITMQ_DEFAULT_USER=root \

      -e RABBITMQ_DEFAULT_PASS=root \ —name mq \ —hostname mq1 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management

//顺序解释: docker运行rabbitmq镜像,设置用户名为:root 设置密码为:root //控制台登录密码 设置容器名称:mq 设置主机名; 设置宿主机端口 15672 设置MQ的通讯端口 5672 -d 设置后台运行 设置镜像名称:镜像别名;

> **查看运行状态:# docker ps**
> ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652014252249-985fc716-28f8-4a87-9ad0-385615a41eca.png#clientId=u9d14019d-99d7-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=104&id=ud9207ae8&margin=%5Bobject%20Object%5D&name=image.png&originHeight=104&originWidth=1152&originalType=binary&ratio=1&rotation=0&showTitle=false&size=260512&status=done&style=none&taskId=u2bc6eaa7-4dff-4a25-a8d3-dd70e9a562d&title=&width=1152)

      3. **访问**[http://119.23.63.60:15672/#/](http://119.23.63.60:15672/#/)**进入RabbitMQ控制台:**![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652014374942-37b04dd1-df34-4d16-8b8f-9a8f354b7f8a.png#clientId=u9d14019d-99d7-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=325&id=uadf987d4&margin=%5Bobject%20Object%5D&name=image.png&originHeight=698&originWidth=1301&originalType=binary&ratio=1&rotation=0&showTitle=false&size=61597&status=done&style=none&taskId=ued3f7d21-ef0b-4f08-8869-092a4a42da8&title=&width=605)
      3. **看到上述页面则证明安装成功;**
<a name="pMGdI"></a>
## 3.2,RabbitMQ控制台界面的介绍:
<a name="YmDWh"></a>
### 3.2.1,任务栏:
![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652057394028-2cf3c63d-6f81-4381-b652-472958697bb5.png#clientId=u9278bc75-6f29-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=114&id=u8f1264ae&margin=%5Bobject%20Object%5D&name=image.png&originHeight=114&originWidth=617&originalType=binary&ratio=1&rotation=0&showTitle=false&size=8466&status=done&style=none&taskId=u1eb16732-db93-4361-9767-24ef4794560&title=&width=617)

      - **Channel:**操作MQ的工具;通道;
      - Connections:查看消息连接;
      - 其他的略;
<a name="JBC6T"></a>
## 3.3,* RabbitMQ的结构与概念:
> **publisher:消息发送者;        consumer:消费者**

![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652057150879-c7aa70f9-abb3-4eb4-85e5-5f6631ecbda2.png#clientId=u9278bc75-6f29-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=422&id=u6de1cdd3&margin=%5Bobject%20Object%5D&name=image.png&originHeight=422&originWidth=1016&originalType=binary&ratio=1&rotation=0&showTitle=false&size=257143&status=done&style=none&taskId=ub904bcd3-3551-4c4a-b01b-942b63612a6&title=&width=1016)

      - **流程原理:**

**当程序有用户进行访问的时候,会创建一个具体隔离性的虚拟主机,初始化交换机和消息队列,交换机会接收消息发送者的消息,再路由到消息队列中,消息队列会对消息进行缓存,然后消费者将通过订阅的消息队列来接收对应消息队列中的消息;**
> **publisher:消息发送者;**
> **exchange:消息交换机;用于路由消息到队列**
> **queue:消息队列;**
> **consumer:消费者订阅队列:消费者从消息队列里面获取并处理消息;**
> **VirtualHost:虚拟主机,每个客户访问都会创建一个虚拟主机,具有隔离性,各个虚拟主机之间相互隔离;**

<a name="e1B3f"></a>
## **3.4,常见的消息模型:**
**又称:Basic Queue 简单队列模型**
> **官方给出的消息模型示例:一共有7种,接下来的是其中的5种,也是常用的5种;**

> **Tips:在所有模型或者任何消息队列接收消息都会由交换机进行路由转发,不然,怎么进行消息转发呢?是吧;在简单模型中,同样的也会存在一个默认的交换机进行路由;**

<a name="jZWpv"></a>
### **3.4.1,HelloWorld模型:(简单模型)**

      - **该模型是基于最基础的消息队列模型来实现的,只包括三个角色:**
         - **publisher:消息发布者,将消息发布到消息队列;**
         - **queue:消息队列,负责接收并缓存消息,也是流量削峰的核心组件;**
         - **consumer:订阅队列,处理消息队列中的消息;**

![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652058098213-3065240b-426b-4398-b5f4-accf0f337cfb.png#clientId=u9278bc75-6f29-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=95&id=u39a0779a&margin=%5Bobject%20Object%5D&name=image.png&originHeight=95&originWidth=894&originalType=binary&ratio=1&rotation=0&showTitle=false&size=45983&status=done&style=none&taskId=u36ff08ff-9000-4f4e-bf7e-e711ab48ead&title=&width=894)
<a name="kENQL"></a>
### **3.4.2,**WorkQueue 工作队列模型:

      - ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652085444153-4b6d5777-6cf7-4c6a-b144-10936bb66c63.png#clientId=ua50d485d-2a35-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=251&id=RBXdi&margin=%5Bobject%20Object%5D&name=image.png&originHeight=369&originWidth=912&originalType=binary&ratio=1&rotation=0&showTitle=false&size=81837&status=done&style=none&taskId=ucf80c544-5744-4e96-940a-a5b3a09be48&title=&width=620)
<a name="Yf8SG"></a>
### 3.4.3,发布订阅模型:

---

<a name="RgnDL"></a>
## **3.5,* MQ的入门案例:(不使用AMQP实现)**
> 也是在后面的AMQP的底层原理;

      1. **在父模块中pom文件导入依赖:**
```xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns = "http://maven.apache.org/POM/4.0.0"
         xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>top.jztice5</groupId>
    <artifactId>mq-demo</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>consumer</module>
        <module>publisher</module>
    </modules>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <parent>
        <artifactId>spring-cloud-starter-parent</artifactId>
        <groupId>org.springframework.cloud</groupId>
        <version>2021.0.2</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>



</project>
  2. **编写publisher的连接:**
public class PublisherTest {

    public static void main (String[] args) {

        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.132.130");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();
    }

}
  3. **编写consumer的连接:**
 public static void main (String[] args) {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.132.130");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }

4,* SpringAMQP:

4.1,什么是SpringAMQP:

  1. **AMQP:**
     1. 全程为:Advanceed Message Queuing Protocol,是用于应用程序或之间传递消息的开发标准。与语言和平台没有关联;
  2. **而SpringAMQP:**
     1. **是基于AMQP协议的一套API规范;**
     1. **包含了两个部分:**
        1. **Spring-AMQP是基础抽象;**
        1. **Spring-rabbit是底层的默认实现;**

利用SpringAMQP实现上述官方模型:

4.2,Basic Queue 简单队列模型案例:

  - **模型结构:**

image.png

4.2.1,导入依赖:

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

4.2.2,在publisher服务中利用RabbitTemplate发送消息到Simple.queue队列:

     1. **在publisher服务中编写application.yml文件,配置连接信息:**
spring:
  rabbitmq:
    host: 119.23.63.60
    port: 5672  #是消息服务的端口
    virtual-host: / #虚拟主机路径
    username: root
    password: root

虚拟主机名与用户对应,查看位置:image.pngimage.png

     2. **在服务中创建一个测试类,编写测试方法:**
        1. 注入RabbitTemplate对象来实现消息的

RabbitTemplate:Rabbit的模板工具类

convertAndSend:发送消息

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @author Jztice5
 * @date 2022年05月09日 9:54
 */

@RunWith (SpringRunner.class)
@SpringBootTest
public class PublisherTest {

    @Autowired (required = false)
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testRabbitTemplate () {
        //设置目标队列名
        String queueName = "simple.queue";
        //设置要发送的消息
        String message = "hello,spring,ohhhhhhhhhhhhhhhhh";
        rabbitTemplate.convertAndSend(queueName, message);

    }

}

注意:此方法,先要有对应名字的消息队列才能存放发送的消息,程序不会帮你新建队列; 在后续的使用方法会自动帮你创建;

4.2.3,* 在consumer消费者服务中编写消费逻辑,绑定(订阅)simplle.queue队列:

     1. **在consumer中编写application.yml文件,配置连接信息:**
spring:
  rabbitmq:
    host: 119.23.63.60
    port: 5672
    virtual-host: /
    username: root
    password: root

与发送者的配置一致

     2. **在consumer中编写消费逻辑类:**
        1. **记得要加上@Component注解,**让boot程序识别为bean对象;
        1. 因为消息的接收与投递都由SpringBoot来进行,因此需要在程序注册为bean对象,让SpringBoot来对该bean实现自动装配;
//让boot程序识别为bean对象
@Component
public class SpringRabbitListener {

    //声明目标队列名称
    //绑定(订阅)消息队列的名称 
    @RabbitListener (queues = "simple.queue")
    public void setTextConsumer(String msg){
        //msg:是SpringBoot通过注解@RabbitListener识别这是一个消息队列的订阅者然后将消息投递到这里的;
        System.out.println("msg = " + msg);

    }

}

当有消费者拉取了消息队列里面的消息,那就会阅后即焚,消息队列里面的东西就不见了;

     - 运行启动类让SpringBoot程序来处理消息的接收和投递;![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652084946222-66d24f8d-8e48-496a-a851-1edfdd8dc356.png#clientId=ua50d485d-2a35-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=60&id=u4777cac5&margin=%5Bobject%20Object%5D&name=image.png&originHeight=60&originWidth=419&originalType=binary&ratio=1&rotation=0&showTitle=false&size=6185&status=done&style=none&taskId=u7a6a77a9-772c-423a-ab95-3f38e11e539&title=&width=419)

4.3,WorkQueue 工作队列模型案例:

  - **模型结构:**
  - ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652085444153-4b6d5777-6cf7-4c6a-b144-10936bb66c63.png#clientId=ua50d485d-2a35-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=251&id=ud3935e87&margin=%5Bobject%20Object%5D&name=image.png&originHeight=369&originWidth=912&originalType=binary&ratio=1&rotation=0&showTitle=false&size=81837&status=done&style=none&taskId=ucf80c544-5744-4e96-940a-a5b3a09be48&title=&width=620)
  - **在该模型中,在发送一条消息的时候,消费者之间会产生竞争关系,只会有一个消费者能接收到;**
  - **该模型的主要作用:**
     - 工作队列,可以提高消息处理速度,避免队列消息的堆积;

4.3.1,在publisher中定义测试方法,每秒产生50条消息,发送到simple.queue:

package top.jztice5.mq.helloword;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @author Jztice5
 * @date 2022年05月09日 9:54
 */

@RunWith (SpringRunner.class)
@SpringBootTest
public class PublisherTest {

    @Autowired (required = false)
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testRabbitTemplate () throws InterruptedException {

        String queueName = "simple.queue";
        String message = "hello,spring,ohhhhhhhhhhhhhhhhh--";
        for (int i = 1 ; i <= 50 ; i++) {
            rabbitTemplate.convertAndSend(queueName, message+i);
            Thread.sleep(20);
        }


    }

}

4.3.2,在consumer中定义两个消息监听者,都监听simple.queue队列:

  - 设置消费者A每秒处理50条消息,消费者B每秒处理10条消息:
  - **使用@RabbitListener注解声明队列;**

//让boot程序识别为bean对象
@Component
public class SpringRabbitListener {

    //声明目标队列名称
    @RabbitListener (queues = "simple.queue")
    public void setTextWork1Consumer(String msg) throws InterruptedException {

        System.out.println("消费者A = " + msg);
        Thread.sleep(20);
    }

    //声明目标队列名称
    @RabbitListener (queues = "simple.queue")
    public void setTextWork2Consumer(String msg) throws InterruptedException {

        System.err.println("消费者B = " + msg);
        //让B比A慢一点;
        Thread.sleep(200);

    }

}

4.3.3,运行结果分析:

image.png

  - **可见,即使消费者B的消费能力比消费者A要慢,但是拿到的消息的数量依然和消费者A差不多,导致运行时间缓慢;**

4.3.4,* 问题的优化:

  - **这里引入了一种配置,称为消息预取限制;prefetch**
     - 可以设置消费者能获取多少条消息,只有所有获取的消息都被消费者处理完成后才能获取其他消息;(避免眼阔肚窄)
  1. **在消费者服务中修改application.yml文件设置 prefetch值:**
spring:
  rabbitmq:
    host: 119.23.63.60
    port: 5672
    virtual-host: /
    username: root
    password: root
    # 设置监听者配置
    listener:
      simple:
        prefetch: 1 # 每次一个消费者只能获取一条消息,当消息处理完后才能获取下一条消息;

4.4,* 发布订阅模型案例:(标准模型)

这个模型与上述的都不同,此模型引入了一个新组件:交换机exchange;

  - 与之前模型的最大区别就是:允许将同一消息通过交换机发送给多个消费者;

image.png

该模型的重点:交换机所有的消息都要经过交换机进行转发路由;不参与消息的存储;、

  - **常见的三种交换机类型:**
     1. **Fanout :广播;**
     1. **Direct : 路由;**
     1. **Topic:话题;**

各类型的详情请看下:使用AMQP;

4.4.1,* FanoutExchange:广播交换机

image.png

注意:在此模型中,在发送一条消息的情况下,所有消费者都会收到;还是同时的;不像无交换机模型那种竞争接收;

4.4.1.1,在consumer服务中,利用代码声明队列,交换机,并将两者绑定:

     1. **在新建一个配置类FanoutConfig,在类中声明FanoutExchange,Queue并绑定关系对象Binding:**
        1. **交换机,队列,绑定关系**
        1. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652188872269-c20c6a22-8a7a-437a-b395-6c6c8eba49fd.png#clientId=u43fed83b-149d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=81&id=u1da6ae70&margin=%5Bobject%20Object%5D&name=image.png&originHeight=122&originWidth=268&originalType=binary&ratio=1&rotation=0&showTitle=false&size=4406&status=done&style=none&taskId=u9cddc7e8-5c93-48c0-aae1-530ae01fbc2&title=&width=178.66666666666666)
        1. **记得加上@Configuration;**
package top.jztice5.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author Jztice5
 * @date 2022年05月10日 21:06
 */

@Configuration
public class FanoutConfig {

    //声明FanoutExchange交换机bean;
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("itcast.fanout");
    }

    //声明Queue1的bean
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout.queue1");
    }
    //声明Queue2的bean
    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanout.queue2");
    }

    //声明绑定关系的Bean,并绑定队列1与交换机
    @Bean
    public Binding bindingFanoutBindingQueue1() {
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }
    //声明绑定关系的Bean,并绑定队列2与交换机
    @Bean
    public Binding bindingFanoutBindingQueue2() {
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }
    //这里的队列和交换机都会自动创建
}
  - 绑定成功效果:
  - ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652189143531-addde9c0-96bf-440d-9a50-469305291b71.png#clientId=u43fed83b-149d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=35&id=u20a459d5&margin=%5Bobject%20Object%5D&name=image.png&originHeight=53&originWidth=430&originalType=binary&ratio=1&rotation=0&showTitle=false&size=4721&status=done&style=none&taskId=u4006ea16-2d0c-4877-b749-4a7a215d162&title=&width=286.6666666666667)
  - ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652189155324-524d61cc-5bd4-4c6c-929d-9fa0eb9753e7.png#clientId=u43fed83b-149d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=211&id=u5f412f20&margin=%5Bobject%20Object%5D&name=image.png&originHeight=343&originWidth=550&originalType=binary&ratio=1&rotation=0&showTitle=false&size=19020&status=done&style=none&taskId=u84940884-9ac8-451e-b765-4fa18033bf3&title=&width=338.66668701171875)

4.4.1.2,在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2:

package top.jztice5.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.time.LocalTime;

/**
 * @author Jztice5
 * @date 2022年05月09日 16:23
 */

//让boot程序识别为bean对象
@Component
public class SpringRabbitListener {

    //声明目标队列名称
    @RabbitListener (queues = "fanout.queue1")
    public void setTextFanoutConsumer1(String msg) throws InterruptedException {

        System.out.println("消费者A = " + msg +"---"+ LocalTime.now());
        Thread.sleep(20);
    }

    @RabbitListener (queues = "fanout.queue2")
    public void setTextFanoutConsumer2(String msg) throws InterruptedException {

        System.out.println("消费者B = " + msg +"---"+ LocalTime.now());
        Thread.sleep(20);
    }


}

4.4.1.3,在publisher中编写方法,向itcast.fanout发送消息:

    /**
     * 发送消息
     */
    @Test
    public void testFanoutExchange () {
        //交换机名称
        String exchangeName = "itcast.fanout";
        //消息
        String msg = "ohhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh";

        //发送
        rabbitTemplate.convertAndSend(exchangeName,"",msg);

    }

4.4.2,* DirectExchange:路由交换机

  - 这个交换机会将接收到的消息根据规则路由到指定的queue,也被称为**路由模式(routes);**
     - 每个Queue都与Exchange设置一个BindingKey;
     - 当发送者发送消息时,指定消息的Queue;
     - Exchange就会将消息路由到BindingKey与消息RoutingKey一致的队列;

 ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652190556513-c07dfc02-0ace-4a80-a5b8-daf9aa30b902.png#clientId=u43fed83b-149d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=318&id=ud800f93b&margin=%5Bobject%20Object%5D&name=image.png&originHeight=556&originWidth=1813&originalType=binary&ratio=1&rotation=0&showTitle=false&size=229356&status=done&style=none&taskId=u58ad48ba-4de3-4861-b454-f28cbbcebec&title=&width=1035.666748046875)

注意:DirectExchange交换机是支持不同的队列queue设置相同的bindingKey;这种情况,就会将消息转发到所有与key匹配的队列; 就能实现类似于广播交换机的效果,但是,要注意key的设置;

步骤和广播交换机不同!

4.4.2.1,在监听者类方法中利用@RabbitListener声明Exchange,Queue,RoutingKey:

  - 在之前的FanoutExchange中是使用Bean来声明交换机与队列绑定关系;
  - **在这里我们使用一种新的也是常用的方式来进行声明:**
     - **使用@RabbitListener注解的相关属性在监听者方法上进行声明即可:**
     - **编写监听者方法分别监听:**direct.queue1和direct.queue2:
package top.jztice5.mq.listener;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author Jztice5
 * @date 2022年05月09日 16:23
 */

//让boot程序识别为bean对象
@Component
public class SpringRabbitListener {

    //声明queue1的绑定关系和key
    @RabbitListener (bindings = @QueueBinding (
            //声明队列名称
            value = @Queue (name = "direct.queue1"),
            //声明交换机名称,以及类型(默认类型是direct)
            exchange = @Exchange (name = "itcast.direct" , type = ExchangeTypes.DIRECT),
            //声明队列key值(RoutingKey)
            key = {"red" , "bule"}
    ))
    public void listenDirectQueue1 (String msg) {
        System.out.println("消费者1收到了消息" + msg);
    }

    //声明queue2的绑定关系和key
    @RabbitListener (bindings = @QueueBinding (
            //声明队列名称
            value = @Queue (name = "direct.queue2"),
            //声明交换机名称,以及类型(默认类型是direct)
            exchange = @Exchange (name = "itcast.direct" , type = ExchangeTypes.DIRECT),
            //声明队列key值(RoutingKey)
            key = {"red" , "bule"}
    ))
    public void listenDirectQueue2(String msg) {
        System.out.println("消费者2收到了消息" + msg);
    }

}

//声明交换机名称,以及类型(默认类型是direct), type = ExchangeTypes.DIRECT ; 在AMQP的api中,交换机类型默认为:Direct;

  - **启动成功效果:**
  - ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652192894642-7bc7bbe7-8b2c-4933-9110-7af022b02b48.png#clientId=u43fed83b-149d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=25&id=u3c718122&margin=%5Bobject%20Object%5D&name=image.png&originHeight=38&originWidth=440&originalType=binary&ratio=1&rotation=0&showTitle=false&size=4379&status=done&style=none&taskId=udf9f7945-4492-4693-8447-f8b08edf878&title=&width=293.3333333333333)
  - ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652193411770-853de3a9-62c3-4d1a-b7b0-4ee05b3549ea.png#clientId=u43fed83b-149d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=251&id=u2dc1d5a7&margin=%5Bobject%20Object%5D&name=image.png&originHeight=480&originWidth=539&originalType=binary&ratio=1&rotation=0&showTitle=false&size=32741&status=done&style=none&taskId=ud6e98fd6-0564-4158-ae8c-ef6625062a3&title=&width=281.3333435058594)

4.4.2.2,在publisher中编写方法,向itcast.direct发送消息:

  - 和之前的类似,只需要在 rabbitTemplate.convertAndSend方法中指定RoutingKey即可;
    /**
     * 发送消息
     */
    @Test
    public void testDirectExchange () {
        //交换机名称
        String exchangeName = "itcast.direct";
        //消息
        String msg = "ohhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh";

        //还要设置routingKey
        rabbitTemplate.convertAndSend(exchangeName,"bule",msg);

    }

注意:路由器名要对应;不然任意找不到bug;

  - **Direct交换机和Fanout交换机的区别:**
  - ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652193771367-5d5b8853-82e6-4237-9fb3-dbb17299349f.png#clientId=u43fed83b-149d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=135&id=u754067e5&margin=%5Bobject%20Object%5D&name=image.png&originHeight=202&originWidth=946&originalType=binary&ratio=1&rotation=0&showTitle=false&size=107918&status=done&style=none&taskId=u2c2238dc-fa6d-4694-ba82-0622f0c8a10&title=&width=630.6666666666666)

4.4.3,▲ TopicExchange:话题交换机 (常用)

  -  TopicExchange与DirectExchange类似,但主要的**区别**是RoutingKey必须是多个单词的列表,并且要用 **. **号进行分割**;**
  - **在Topic中,Queue与Exchange指定BindingKey可以使用通配符:**
     - **# :代指0个或多个单词;**
     - *** : 代指一个单词;**

image.png

  - **案例结构:**
  - ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652194434182-3234d8a0-7824-41b0-8731-0cf465cdadb5.png#clientId=u43fed83b-149d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=231&id=u5d4ea455&margin=%5Bobject%20Object%5D&name=image.png&originHeight=347&originWidth=1411&originalType=binary&ratio=1&rotation=0&showTitle=false&size=168268&status=done&style=none&taskId=uc8c98c22-c8e2-440c-bb0c-c895760ea13&title=&width=940.6666666666666)

4.4.3.1,* 在监听者类方法中利用@RabbitListener声明Exchange,Queue,RoutingKey:

     - **使用@RabbitListener注解的相关属性在监听者方法上进行声明即可:**
     - **编写监听者方法分别监听:**topic.queue1 和 topic.queue2:
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author Jztice5
 * @date 2022年05月09日 16:23
 */

//让boot程序识别为bean对象
@Component
public class SpringRabbitListener {

    //声明queue1的绑定关系和key
    @RabbitListener (bindings = @QueueBinding (
            //声明队列名称
            value = @Queue (name = "topic.queue1"),
            //声明交换机名称,以及类型(默认类型是direct)
            exchange = @Exchange (name = "itcast.topic" , type = ExchangeTypes.TOPIC),
            //声明队列key值(RoutingKey)
            key = "china.#"
    ))
    public void listenTopicQueue1 (String msg) {
        System.out.println("消费者1收到了消息" + msg);
    }

    //声明queue2的绑定关系和key
    @RabbitListener (bindings = @QueueBinding (
            //声明队列名称
            value = @Queue (name = "topic.queue2"),
            //声明交换机名称,以及类型(默认类型是direct)
            exchange = @Exchange (name = "itcast.topic" , type = ExchangeTypes.TOPIC),
            //声明队列key值(RoutingKey)
            key = "#.news"
    ))
    public void listenTopicQueue2(String msg) {
        System.out.println("消费者2收到了消息" + msg);
    }

}

注意:要设置交换机的类型为:TOPIC;

  - **启动成功效果:**
  - ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652194824310-ba1f11df-84e2-4ffa-8520-9e7722044794.png#clientId=u43fed83b-149d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=31&id=ufbd89bf6&margin=%5Bobject%20Object%5D&name=image.png&originHeight=47&originWidth=430&originalType=binary&ratio=1&rotation=0&showTitle=false&size=4534&status=done&style=none&taskId=u86ad7078-35dd-4194-8aff-7be720842e0&title=&width=286.6666666666667)
  - ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652194837744-d74deb82-96f3-4cd2-a624-787aa0830fee.png#clientId=u43fed83b-149d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=202&id=ub5453842&margin=%5Bobject%20Object%5D&name=image.png&originHeight=345&originWidth=539&originalType=binary&ratio=1&rotation=0&showTitle=false&size=22523&status=done&style=none&taskId=u49c72a44-21b5-4cbb-a37b-b0502f9c70f&title=&width=316.3333435058594)

4.4.3.2,* 在publisher中编写方法,向itcast.topic发送消息:

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @author Jztice5
 * @date 2022年05月09日 9:54
 */

@RunWith (SpringRunner.class)
@SpringBootTest
public class PublisherTest {

    //注入RabbitTemplate
    @Autowired (required = false)
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息
     */
    @Test
    public void testTopicExchange () {
        //交换机名称
        String exchangeName = "itcast.topic";
        //消息
        String msg = "ohhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh";

        //还要设置routingKey
        rabbitTemplate.convertAndSend(exchangeName,"china.news",msg);

    }


}
  - **消息接收的情况举例:**
     - ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25975946/1652195410210-3097c008-fac6-4d1b-aa13-108a8c8cd976.png#clientId=u43fed83b-149d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=118&id=u76b219b2&margin=%5Bobject%20Object%5D&name=image.png&originHeight=177&originWidth=509&originalType=binary&ratio=1&rotation=0&showTitle=false&size=16068&status=done&style=none&taskId=u5dff2ca3-2df7-483d-83b8-6c792810635&title=&width=339.3333333333333)
发送的消息 目标队列
china.news 两个都接收到
dada.news 队列2收到
china.dada 队列1收到

4.5,AMQP的注解:

注解 作用
@RabbitListener 声明(设置)绑定关系,交换机名称和类型,消息队列名称,RoutingKey值(队列)

* 其他版本的MQ中间件同理;