同步通讯和异步通讯特点

  • 同步通讯:就像舔狗聊天,需要实时响应。
  • 异步通讯:就像海王聊天,不需要马上回复。

image.png

同步通讯

特点

image.png

总结

  • 同步调用的优点:
    • 时效性较强,可以立即得到结果
  • 同步调用的问题:

    • 耦合度高
    • 性能和吞吐能力下降
    • 有额外的资源消耗
    • 有级联失败问题

      异步通讯

      优点

  • 吞吐量提升:无需等待订阅者处理完成,响应更快速

  • 故障隔离:服务没有直接调用,不存在级联失败问题
  • 调用间没有阻塞,不会造成无效的资源占用
  • 耦合度极低,每个服务都可以灵活插拔,可替换
  • 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件

    补充

  • 在大并发(高并发)的环境下,如何优化服务

    • 限流
    • 削峰
    • 资金充裕的情况下也可以搭建集群

      缺点

  • 架构复杂了,业务没有明显的流程线,不好管理

  • 需要依赖于Broker的可靠、安全、性能

    MQ的技术选型

    技术对比

    MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
    比较常见的MQ实现:

  • ActiveMQ

  • RabbitMQ
  • RocketMQ
  • Kafka

    常见MQ的对比

    | | RabbitMQ | ActiveMQ | RocketMQ | Kafka | | —- | —- | —- | —- | —- | | 公司/社区 | Rabbit | Apache | 阿里 | Apache | | 开发语言 | Erlang | Java | Java | Scala&Java | | 协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 | | 可用性 | | 一般 | | | | 单机吞吐量 | 一般 | | | 非常高 | | 消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 | | 消息可靠性 | | 一般 | | 一般 |

各自的优点

  • 追求可用性:Kafka、 RocketMQ 、RabbitMQ
  • 追求可靠性:RabbitMQ、RocketMQ
  • 追求吞吐能力:RocketMQ、Kafka
  • 追求消息低延迟:RabbitMQ、Kafka

image.png
image.png

快速入门RabbitMQ

MQ的基本结构

image.png
RabbitMQ中的一些角色:

  • publisher:生产者
  • consumer:消费者
  • exchange个:交换机,负责消息路由
  • queue:队列,存储消息
  • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离

    安装RabbitMQ

    在线拉取

    1. docker pull rabbitmq:3.8-management

    创建MQ容器

    docker run \
    -e RABBITMQ_DEFAULT_USER=itcast \
    -e RABBITMQ_DEFAULT_PASS=itcast142 \
    -v mq-plugin:/plugins \
    --name mq \
    --hostname mq \
    -p 15672:15672 \
    -p 5672:5672 \
    -d \
    rabbitmq:3.8-management
    

    RabbitMQ服务结构体系image.png

    入门案例

    导入Demo工程

    image.png

  • 包括三部分:

    • mq-demo:父工程,管理项目依赖
    • publisher:消息的发送者
    • consumer:消息的消费者

      简单队列模式的模型图

      image.png
      官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:
  • publisher:消息发布者,将消息发送到队列queue

  • queue:消息队列,负责接受并缓存消息
  • consumer:订阅队列,处理队列中的消息

    publisher实现

  • 思路:

    • 建立连接
    • 创建Channel
    • 声明队列
    • 发送消息
    • 关闭连接和channel
  • 代码实现: ```java package cn.itcast.mq.helloworld;

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.junit.Test;

import java.io.IOException; import java.util.concurrent.TimeoutException;

public class PublisherTest { @Test public void testSendMessage() throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost(“192.168.94.128”); factory.setPort(5672); factory.setVirtualHost(“/“); factory.setUsername(“itcast”); factory.setPassword(“itcast142”); // 1.2.建立连接 Connection connection = factory.newConnection();

    // 2.创建通道Channel
    Channel channel = connection.createChannel();

    // 3.创建队列
    String queueName = "simple.queue";
    channel.queueDeclare(queueName, false, false, false, null);

    // 4.发送消息
    String message = "hello, rabbitmq!";
    channel.basicPublish("", queueName, null, message.getBytes());
    System.out.println("发送消息成功:【" + message + "】");

    // 5.关闭通道和连接
    channel.close();
    connection.close();

}

}

<a name="rW6JZ"></a>
#### consumer实现

- **代码思路:**
   - **建立连接**
   - **创建Channel**
   - **声明队列**
   - **订阅消息**
- **代码实现:**
```java
package cn.itcast.mq.helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.94.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("itcast142");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

总结

  • 基本消息队列的消息发送流程:
    • 建立connection
    • 创建channel
    • 利用channel声明队列
    • 利用channel向队列发送消息
  • 基本消息队列的消息接收流程:

    • 建立connection
    • 创建channel
    • 利用channel声明队列
    • 定义consumer的消费行为handleDelivery()
    • 利用channel将消费者与队列绑定

      AMQP和SpringAMQP说明和关系

      AMQP

      image.png

      SpringAMQP

      SpringAmqp的官方地址:https://spring.io/projects/spring-amqp
      image.png

      关系

  • AMQP实际上就是一个协议,制定了一个规则

  • RabbitMQ对AMQP进行了实现
  • SpringAMQP实际上就是对AMQP进行了二次包装,简化开发过程,使得更好的进行使用

    RabbitMQ队列模式

    image.png

    SpringAMQP如何实现各个队列模式

    Basic Queue 简单队列模型

  • 在父工程mq-demo中引入依赖

    <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    消息发送

  • 首先配置MQ地址,在publisher服务的application.yml中添加配置:

    spring:
    rabbitmq:
      host: 192.168.94.128 # 主机名
      port: 5672 # 端口
      virtual-host: / # 虚拟主机
      username: itcast # 用户名
      password: itcast142 # 密码
    
  • 然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送: ```java package cn.itcast.mq.helloworld;

import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;

/**

  • @author: Mr.W
  • @date: 2022/6/1
  • @description: 测试 */ @SpringBootTest public class SpringAmqpTest {

    / Rabbit设计模式(模板) / @Autowired private RabbitTemplate rabbitTemplate;

    @Test public void testBasicQueue() {

     /**
      * 参数:
      *    1.队列名称
      *    2.消息
      */
     rabbitTemplate.convertAndSend("simple.queue","hello, rabbitmq!");
    

    } }

![image.png](https://cdn.nlark.com/yuque/0/2022/png/1799609/1654086542469-1bf65dc3-b9a4-48da-81be-f12b1d656776.png#clientId=uf9038602-139e-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=465&id=u68f359a6&margin=%5Bobject%20Object%5D&name=image.png&originHeight=465&originWidth=1377&originalType=binary&ratio=1&rotation=0&showTitle=false&size=967119&status=done&style=none&taskId=u6fd2a2ef-ed59-471e-9787-5dda127b705&title=&width=1377)
<a name="E7vxx"></a>
#### 消息接收

- **首先配置MQ地址,在consumer服务的application.yml中添加配置:**
```java
spring:
  rabbitmq:
    host: 192.168.94.128 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: itcast # 用户名
    password: itcast142 # 密码
  • 然后在consumer服务的cn.itcast.mq.listener包中新建一个类SpringRabbitListener,代码如下: ```java package cn.itcast.mq.listener;

import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;

/**

  • @author: Mr.W
  • @date: 2022/6/1
  • @description: */ @Slf4j @Component public class SpringRabbitListener { @RabbitListener(queues = {“simple.queue”}) public void testBasicQueue(String msg) {
     System.out.println("spring 消费者接收到消息:【" + msg + "】");
    
    } }
![image.png](https://cdn.nlark.com/yuque/0/2022/png/1799609/1654086925346-b0098217-e075-4e3e-bfc1-e805cc096d0c.png#clientId=uf9038602-139e-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=667&id=u47a1a515&margin=%5Bobject%20Object%5D&name=image.png&originHeight=667&originWidth=835&originalType=binary&ratio=1&rotation=0&showTitle=false&size=795105&status=done&style=none&taskId=ucc953f3a-72e9-463f-b620-9a38e2af924&title=&width=835)
<a name="Nt0rB"></a>
### WorkQueue
**Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。**<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/1799609/1654087023555-1460b1b2-c735-4579-b1f4-279fb72d7dd1.png#clientId=uf9038602-139e-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=220&id=udd4cfbfd&margin=%5Bobject%20Object%5D&name=image.png&originHeight=220&originWidth=589&originalType=binary&ratio=1&rotation=0&showTitle=false&size=23656&status=done&style=none&taskId=u7b3719d0-8d89-40bb-8eec-7ee0f1a42de&title=&width=589)<br />**处理消息的方式:并不是同时接收,而是一人一次**
<a name="i7YIi"></a>
#### 测试案例说明
**在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue在consumer服务中定义两个消息监听者,都监听simple.queue队列消费者1每秒处理50条消息,消费者2每秒处理10条消息**
<a name="nEWX5"></a>
#### 消息发送

- **这次我们循环发送,模拟大量消息堆积现象。在publisher服务中的SpringAmqpTest类中添加一个测试方法:**
```java
package cn.itcast.mq.helloworld;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
 * @author: Mr.W
 * @date: 2022/6/1
 * @description: 测试
 */
@SpringBootTest
public class SpringAmqpTest {

    /* Rabbit设计模式(模板) */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testBasicQueue() {

        /**
         * 参数:
         *    1.队列名称
         *    2.消息
         */
        rabbitTemplate.convertAndSend("simple.queue","hello, rabbitmq!");
    }

    /**
     * WorkQueue的测试
     */
    @Test
    public void testBasicQueue1() throws InterruptedException {
        for (int i = 1; i <= 50; i++) {
            String msg = "工作队列模式,消息发送_";
            /**
             * 参数:
             *    1.队列名称
             *    2.消息
             */
            rabbitTemplate.convertAndSend("simple.queue",msg+i);
            //保证每秒50条
            Thread.sleep(20);
        }

    }
}

消息接收

  • 要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法: ```java package cn.itcast.mq.listener;

import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;

/**

  • @author: Mr.W
  • @date: 2022/6/1
  • @description: */ @Slf4j @Component public class SpringRabbitListener { @RabbitListener(queues = {“simple.queue”}) public void testBasicQueue(String msg) {

     System.out.println("amqp消费者-1--接收到消息:【" + msg + "】");
    

    }

    @RabbitListener(queues = {“simple.queue”}) public void testBasicQueue1(String msg) {

     System.err.println("amqp消费者-2--接收到消息:【" + msg + "】");
    

    } }

<a name="Nig3e"></a>
#### 测试结果
```java
amqp消费者-1--接收到消息:【工作队列模式,消息发送_1】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_2】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_3】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_5】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_4】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_7】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_9】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_11】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_6】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_13】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_15】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_8】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_17】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_19】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_10】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_21】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_23】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_25】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_12】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_27】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_29】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_14】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_31】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_33】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_16】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_35】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_37】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_39】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_18】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_41】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_43】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_20】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_45】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_47】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_22】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_49】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_24】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_26】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_28】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_30】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_32】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_34】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_36】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_38】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_40】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_42】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_44】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_46】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_48】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_50】
  • 可以看到消费者1很快完成了自己的25条消息。消费者2却在缓慢的处理自己的25条消息。
  • 也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这样显然是有问题的。

    能者多劳

  • 在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

    spring:
    rabbitmq:
      listener:
        simple:
          prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
    

    实现了能者多劳
    image.png

    总结

    Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理

  • 通过设置prefetch来控制消费者预取的消息数量

    发布/订阅模式

    image.png

  • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • Consumer:消费者,与以前一样,订阅队列,没有变化
  • Queue:消息队列也与以前一样,接收消息、缓存消息。

注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力

Fanout

image.png
在广播模式下,消息发送流程是这样的:

  • 可以有多个队列
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
  • 交换机把消息发送给绑定过的所有队列
  • 订阅队列的消费者都能拿到消息,同时接收

    实现步骤

  • 创建一个交换机 itcast.fanout,类型是Fanout

  • 创建两个队列fanout.queue1和fanout.queue2,绑定到交换机itcast.fanout

image.png

具体实现

声明队列和交换机
  • Spring提供了一个接口Exchange,来表示所有不同类型的交换机:

image.png

  • 在consumer中创建一个类,声明队列和交换机: ```java package cn.itcast.mq.config;

import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;

/**

  • @author: Mr.W
  • @date: 2022/6/1
  • @description: */ @Configuration public class FanoutConfig {

    / 第一个队列 / @Bean public Queue fanoutQueue1() {

     return new Queue("fanout.queue1");
    

    }

    / Fanout交换机 / @Bean public FanoutExchange fanoutExchange() {

     return new FanoutExchange("itcast.fanout");
    

    }

    /**

    • 当前的bean都没有写名称,这时候就会以方法名作为该对象名称,所以这里自动找对应名称的对象
    • @param fanoutQueue1
    • @param fanoutExchange
    • @return */ @Bean public Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); }

      / 第二个队列 / @Bean public Queue fanoutQueue2() { return new Queue(“fanout.queue1”); }

/**
 * 当前的bean都没有写名称,这时候就会以方法名作为该对象名称,所以这里自动找对应名称的对象
 * @param fanoutQueue2
 * @param fanoutExchange
 * @return
 */
@Bean
public Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}

}

<a name="sGSjd"></a>
#### 消息发送

- **在publisher服务的SpringAmqpTest类中添加测试方法:**
```java
package cn.itcast.mq.helloworld;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
 * @author: Mr.W
 * @date: 2022/6/1
 * @description: 测试
 */
@SpringBootTest
public class SpringAmqpTest {

    /**
     * Fanout的测试
     */
    @Test
    public void testFanoutQueue() throws InterruptedException {
        /**
         * 参数:
         *    1.队列名称
         *    2.路由key:由于是fanout模式。所以要给""
         *    3.消息
         */
        rabbitTemplate.convertAndSend("itcast.fanout","","fanout 模式消息发送!");
    }
}

image.png

消息接收

  • 在consumer服务的SpringRabbitListener中添加两个方法,作为消费者: ```java package cn.itcast.mq.listener;

import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;

/**

  • @author: Mr.W
  • @date: 2022/6/1
  • @description: */ @Slf4j @Component public class SpringRabbitListener { @RabbitListener(queues = {“fanout.queue1”}) public void testFanoutQueue1(String msg) throws InterruptedException {
     System.err.println("Fanout消费者-1--接收到消息:【" + msg + "】");
    
    } @RabbitListener(queues = {“fanout.queue2”}) public void testFanoutQueue2(String msg) throws InterruptedException {
     System.err.println("Fanout消费者-2--接收到消息:【" + msg + "】");
    
    } }
![image.png](https://cdn.nlark.com/yuque/0/2022/png/1799609/1654089975449-7a8ffaf3-219f-493b-bd1b-5b06e9d5f856.png#clientId=uf9038602-139e-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=480&id=ua5e22c34&margin=%5Bobject%20Object%5D&name=image.png&originHeight=480&originWidth=815&originalType=binary&ratio=1&rotation=0&showTitle=false&size=608537&status=done&style=none&taskId=u00b7516a-7393-47e2-9546-75bf70b4f42&title=&width=815)
<a name="mCdK3"></a>
#### 总结

- **交换机的作用是什么?**
   - **接收publisher发送的消息**
   - **将消息按照规则路由到与之绑定的队列**
   - **不能缓存消息,路由失败,消息丢失**
   - **FanoutExchange的会将消息路由到每个绑定的队列**
- **声明队列、交换机、绑定关系的Bean是什么?**
   - **Queue**
   - **FanoutExchange**
   - **Binding**
<a name="WcSkS"></a>
### Direct
**Direct可以对不同的消息被不同的队列消费,不同于Fanout一条消息给所有地方队列都消费**<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/1799609/1654090040373-c7c11241-01d2-4dd8-814e-b943402096fb.png#clientId=uf9038602-139e-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=179&id=ub1b3ec23&margin=%5Bobject%20Object%5D&name=image.png&originHeight=179&originWidth=589&originalType=binary&ratio=1&rotation=0&showTitle=false&size=31253&status=done&style=none&taskId=u460f2672-a5e6-4e4a-9ada-04e830f5773&title=&width=589)

- **队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)**
- **消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。**
- **Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息**
<a name="ZsC49"></a>
#### 实践案例需求

- **利用@RabbitListener声明Exchange、Queue、RoutingKey**
- **在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2**
- **在publisher中编写测试方法,向itcast. direct发送消息**

![image.png](https://cdn.nlark.com/yuque/0/2022/png/1799609/1654090239530-cffab4a7-910d-4942-9379-a82d4818b1ec.png#clientId=uf9038602-139e-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=216&id=u17e092d5&margin=%5Bobject%20Object%5D&name=image.png&originHeight=216&originWidth=589&originalType=binary&ratio=1&rotation=0&showTitle=false&size=39050&status=done&style=none&taskId=u969822db-4925-4c5c-9e3e-8c500132ec2&title=&width=589)
<a name="J3pKC"></a>
#### 交换机和队列声明的模式
<a name="SkZUD"></a>
##### 配置类声明
```java
package cn.itcast.mq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author: Mr.W
 * @date: 2022/6/1
 * @description:
 */
@Configuration
public class DirectConfig {

    /* 第一个队列 */
    @Bean
    public Queue directQueue1() {
        return new Queue("direct.queue1");
    }

    /* Fanout交换机 */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("itcast.direct");
    }

    /**
     * 当前的bean都没有写名称,这时候就会以方法名作为该对象名称,所以这里自动找对应名称的对象
     * @param directQueue1
     * @param directExchange
     * @return
     */
    @Bean
    public Binding directBinding1(Queue directQueue1,DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    }
}

注解声明
package cn.itcast.mq.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author: Mr.W
 * @date: 2022/6/1
 * @description:
 */
@Slf4j
@Component
public class SpringRabbitListener {
    @RabbitListener(
            bindings = {@QueueBinding(
                    value = @Queue("direct.queue1"),
                    exchange = @Exchange(value = "itcast.direct",type = ExchangeTypes.DIRECT),
                    key = {"red","blue"}
            )}
    )
    public void testDirectQueue1(String msg) throws InterruptedException {
        System.err.println("Fanout消费者-1--接收到消息:【" + msg + "】");
    }

    @RabbitListener(
            bindings = {@QueueBinding(
                    value = @Queue("direct.queue2"),
                    exchange = @Exchange(value = "itcast.direct",type = ExchangeTypes.DIRECT),
                    key = {"red","yellow"}
            )}
    )
    public void testDirectQueue2(String msg) throws InterruptedException {
        System.err.println("Fanout消费者-2--接收到消息:【" + msg + "】");
    }
}

image.png

消息发送
  • 在publisher服务的SpringAmqpTest类中添加测试方法 ```java package cn.itcast.mq.helloworld;

import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;

/**

  • @author: Mr.W
  • @date: 2022/6/1
  • @description: 测试 / @SpringBootTest public class SpringAmqpTest { /*
    • Direct的测试 / @Test public void testDirectQueue() throws InterruptedException { /*
      • 参数:
      • 1.队列名称
      • 2.路由key:由于是fanout模式。所以要给””
      • 3.消息 */ rabbitTemplate.convertAndSend(“itcast.direct”,”blue”,”direct 模式消息发送!”); } }
<a name="TYy8u"></a>
#### 总结

- **描述下Direct交换机与Fanout交换机的差异?**
   - **Fanout交换机将消息路由给每一个与之绑定的队列**
   - **Direct交换机根据RoutingKey判断路由给哪个队列**
   - **如果多个队列具有相同的RoutingKey,则与Fanout功能类似**
- **基于@RabbitListener注解声明队列和交换机有哪些常见注解?**
   - **@Queue**
   - **@Exchange**
<a name="HPUG3"></a>
### Topic
<a name="mFxMj"></a>
#### 说明
**Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!**<br />**Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: <br />**<br />**#:匹配一个或多个词**<br />***:匹配不多不少恰好1个词**<br />**举例:**<br />**item.#:能够匹配item.spu.insert 或者 item.spu**<br />**item.*:只能匹配item.spu**<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/1799609/1654091693329-6b27b816-f28a-41e4-8185-140cf8d456fc.png#clientId=uf9038602-139e-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=194&id=u797fb19c&margin=%5Bobject%20Object%5D&name=image.png&originHeight=194&originWidth=589&originalType=binary&ratio=1&rotation=0&showTitle=false&size=51355&status=done&style=none&taskId=u36d2698a-38f8-4e48-9d27-0745f32f0c4&title=&width=589)

- **Queue1:绑定的是china.# ,因此凡是以 china.开头的routing key 都会被匹配到。包括china.news和china.weather**
- **Queue2:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配。包括china.news和japan.news**
<a name="rfg23"></a>
#### 案例需求与思路
![image.png](https://cdn.nlark.com/yuque/0/2022/png/1799609/1654091934275-d4ca13b2-0e79-40f4-808d-fd287fbd96c4.png#clientId=uf9038602-139e-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=162&id=uac45d241&margin=%5Bobject%20Object%5D&name=image.png&originHeight=162&originWidth=589&originalType=binary&ratio=1&rotation=0&showTitle=false&size=33536&status=done&style=none&taskId=u5184976c-ee61-4963-94ed-9f5b660c60f&title=&width=589)

- **并利用@RabbitListener声明Exchange、Queue、RoutingKey**
- **在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2**
- **在publisher中编写测试方法,向itcast. topic发送消息**
<a name="jbM1c"></a>
#### 案例实现
配置
```java
package cn.itcast.mq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author: Mr.W
 * @date: 2022/6/1
 * @description:
 */
@Configuration
public class TopicConfig {

    /* 第一个队列 */
    @Bean
    public Queue topicQueue1() {
        return new Queue("topic.queue1");
    }

    /* Fanout交换机 */
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("itcast.topic");
    }

    /**
     * 当前的bean都没有写名称,这时候就会以方法名作为该对象名称,所以这里自动找对应名称的对象
     * @param topicQueue1
     * @param topicExchange
     * @return
     */
    @Bean
    public Binding topicBinding1(Queue topicQueue1,TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue1).to(topicExchange).with("china.#");
    }

    /* 第二个队列 */
    @Bean
    public Queue topicQueue2() {
        return new Queue("topic.queue2");
    }


    /**
     * 当前的bean都没有写名称,这时候就会以方法名作为该对象名称,所以这里自动找对应名称的对象
     * @param topicQueue2
     * @param topicExchange
     * @return
     */
    @Bean
    public Binding topicBinding2(Queue topicQueue2, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue2).to(topicExchange).with("#.news");
    }
}

消息发送
  • 在publisher服务的SpringAmqpTest类中添加测试方法 ```java package cn.itcast.mq.helloworld;

import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;

/**

  • @author: Mr.W
  • @date: 2022/6/1
  • @description: 测试 / @SpringBootTest public class SpringAmqpTest { /*
    • topic的测试 / @Test public void testTopicQueue() throws InterruptedException { /*
      • 参数:
      • 1.队列名称
      • 2.路由key
      • 3.消息 */ rabbitTemplate.convertAndSend(“itcast.direct”,”china.news”,”direct 模式消息发送!”); } }
<a name="YwD2f"></a>
##### 消息接收

- **在consumer服务的SpringRabbitListener中添加方法**
```java
package cn.itcast.mq.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author: Mr.W
 * @date: 2022/6/1
 * @description:
 */
@Slf4j
@Component
public class SpringRabbitListener {
    @RabbitListener(queues = {"topic.queue1"})
    public void testTopicQueue1(String msg) throws InterruptedException {
        System.err.println("Topic消费者-1--接收到消息:【" + msg + "】");
    }
    @RabbitListener(queues = {"topic.queue2"})
    public void testTopicQueue2(String msg) throws InterruptedException {
        System.err.println("Topic消费者-2--接收到消息:【" + msg + "】");
    }
}

image.png

总结

描述下Direct交换机与Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割
  • Topic交换机与队列绑定时的bindingKey可以指定通配符
  • #:代表0个或多个词
  • *:代表1个词

    消息转换器

    SpringAMQP默认对Java转换

  • 我们修改消息发送的代码,发送一个Map对象:

    @Test
    public void testSendMap() throws InterruptedException {
      // 准备消息
      Map<String,Object> msg = new HashMap<>();
      msg.put("name", "Jack");
      msg.put("age", 21);
      // 发送消息
      rabbitTemplate.convertAndSend("simple.queue","", msg);
    }
    

    如何改变消息转换器——配置JSON转换器

    导入依赖

  • 在publisher和consumer两个服务中都引入依赖:

    <dependency>
      <groupId>com.fasterxml.jackson.dataformat</groupId>
      <artifactId>jackson-dataformat-xml</artifactId>
      <version>2.9.10</version>
    </dependency>
    

    配置消息转换器

    在启动类中添加一个Bean即可:

    @Bean
    public MessageConverter jsonMessageConverter(){
      return new Jackson2JsonMessageConverter();
    }