1.提出问题

1.当硬件条件有限时如何提高吞吐量? MQ消息队列:把接受的请求先放在队列中,不处理

2..名词解释

同步通讯:就像打电话,需要实时响应。

优点:
    - 时效性较强,可以立即得到结果
缺点:
    - 耦合度高
    - 性能和吞吐能力下降
    - 有额外的资源消耗
    - 有级联失败问题
异步通讯:就像发邮件,不需要马上回复。

好处:
    - 吞吐量提升:无需等待订阅者处理完成,响应更快速
    - 故障隔离:服务没有直接调用,不存在级联失败问题
    - 调用间没有阻塞,不会造成无效的资源占用
    - 耦合度极低,每个服务都可以灵活插拔,可替换
    - 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件

缺点:
    - 架构复杂了,业务没有明显的流程线,不好管理
    - 需要依赖于Broker的可靠、安全、性能

3.MQ概述

:::info MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。 :::

1.常见的MQ

image.png

2.MQ执行原理

image.png

4.安装RabbitMQ

安装链接

5.RabbitMQ消息模型

image.png

1.原始实现代码

<!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }
}
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

6.SpringAMQP

SpringAMQP官网 :::info SpringAMQP是基于RabbitMQ封装的一套模板,并且利用SpringBoot对其实现了自动装配

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息 :::

    1.SpringAMQP使用配置

    1.父工程中引入依赖

    <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    2.配置JSON转换器(发送和接收端都得配置)

    <dependency>
      <groupId>com.fasterxml.jackson.dataformat</groupId>
      <artifactId>jackson-dataformat-xml</artifactId>
      <version>2.9.10</version>
    </dependency>
    
    @Bean
    public MessageConverter jsonMessageConverter(){
      return new Jackson2JsonMessageConverter();
    }
    

    3.在publisher(发送)服务和consumer(接收)服务的application.yml中添加配置:

    spring:
    rabbitmq:
      host: 192.168.150.101 # 主机名
      port: 5672 # 端口
      virtual-host: / # 虚拟主机
      username: itcast # 用户名
      password: 123321 # 密码
    
          listener: #接收端能者多劳配置 大多用于任务队列模式
        simple:
          prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
    

    2.SpringAMQP的消息模型使用(单队列省略交换机)

    1..Basic Queue 简单队列模型(一个消费者绑定一个队列)

    image.png

    @Autowired
      private RabbitTemplate rabbitTemplate;
    
      @Test
      public void testSimpleQueue() {
          // 队列名称
          String queueName = "simple.queue";
          // 消息
          String message = "hello, spring amqp!";
          // 发送消息
          rabbitTemplate.convertAndSend(queueName, message);
      }
    
    @Component
    public class SpringRabbitListener {
    
      @RabbitListener(queues = "simple.queue")
      public void listenSimpleQueueMessage(String msg) throws InterruptedException {
          System.out.println("spring 消费者接收到消息:【" + msg + "】");
      }
    }
    

    2.Work queues 任务队列模型(多个消费者绑定到一个队列

    image.png

    spring:
    rabbitmq:
      listener:
        simple:
          prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
    

    ```java @Autowired private RabbitTemplate rabbitTemplate;

@Test public void testWorkQueue() throws InterruptedException { // 队列名称 String queueName = “simple.queue”; // 消息 String message = “hello, message_”; for (int i = 0; i < 50; i++) { // 发送消息 rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); } }

```java
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
}

3.发布/订阅(Publish /Subscribe)模型

image.png :::info

  • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • Consumer:消费者,订阅队列
  • Queue:消息队列,接收消息、缓存消息 :::

    注意:Spring提供了一个接口Exchange,来表示所有不同类型的交换机

    image.png

    1.Fanout Exchange 广播(消息发布到交换机,交换机分发到每个队列中)

    image.png :::info 广播模式下,消息发送流程:type=ExchangeTypes.FANOUT

  • 1 可以有多个队列

  • 2 每个队列都要绑定到Exchange(交换机)
  • 3 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
  • 4 交换机把消息发送给绑定过的所有队列
  • 5 订阅队列的消费者都能拿到消息 ::: ```java @Autowired private RabbitTemplate rabbitTemplate;

@Test public void testFanoutExchange() { // 队列名称 String exchangeName = “itcast.fanout”; // 消息 String message = “hello, everyone!”; rabbitTemplate.convertAndSend(exchangeName, “”, message); }

```java
//配置交换机绑定队列
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    /**
     * 声明交换机
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}


//接受消息
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "fanout.queue1"),
    exchange = @Exchange(name = "itcast.fanout", type = ExchangeTypes.FANOUT),
    key = {"red", "blue"}
))
public void listenFanoutQueue1(String msg) {
    System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "fanout.queue2"),
    exchange = @Exchange(name = "itcast.fanout", type = ExchangeTypes.FANOUT),
    key = {"red", "blue"}
))
public void listenFanoutQueue2(String msg) {
    System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

2.Direct Exchange 路由(只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息)

image.png :::info Direct模型下:type = ExchangeTypes.DIRECT

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息 :::
    @Test
    public void testSendDirectExchange() {
      // 交换机名称
      String exchangeName = "itcast.direct";
      // 消息
      String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
      // 发送消息
      rabbitTemplate.convertAndSend(exchangeName, "red", message);
    }
    
    ```java @RabbitListener(bindings = @QueueBinding( value = @Queue(name = “direct.queue1”), exchange = @Exchange(name = “itcast.direct”, type = ExchangeTypes.DIRECT), key = {“red”, “blue”} )) public void listenDirectQueue1(String msg){ System.out.println(“消费者接收到direct.queue1的消息:【” + msg + “】”); }

@RabbitListener(bindings = @QueueBinding( value = @Queue(name = “direct.queue2”), exchange = @Exchange(name = “itcast.direct”, type = ExchangeTypes.DIRECT), key = {“red”, “yellow”} )) public void listenDirectQueue2(String msg){ System.out.println(“消费者接收到direct.queue2的消息:【” + msg + “】”); }

<a name="gOzFw"></a>
#### 3.Topic    Exchange 主题(Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!)
![image.png](https://cdn.nlark.com/yuque/0/2022/png/29328734/1656161526435-def2b0c1-3aa0-4fcf-bff1-af9bcdc715a3.png#clientId=u08583f96-3b0d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=311&id=ua9a5ee78&margin=%5Bobject%20Object%5D&name=image.png&originHeight=343&originWidth=1016&originalType=binary&ratio=1&rotation=0&showTitle=false&size=127789&status=done&style=none&taskId=u9c0726e5-94b7-41d3-9f1e-bca9f198bf9&title=&width=920.150976510353)
:::info
Topic模式下:type = ExchangeTypes.TOPIC<br />Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert<br />通配符规则:<br />#:匹配一个或多个词        item.#:能够匹配item.spu.insert 或者 item.spu<br />*:匹配不多不少恰好1个词    item.*:只能匹配item.spu
:::
```java
/**
     * topicExchange
     */
@Test
public void testSendTopicExchange() {
    // 交换机名称
    String exchangeName = "itcast.topic";
    // 消息
    String message = "喜报!孙悟空大战哥斯拉,胜!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}

7.消息转换器

:::info Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题

  • 数据体积过大
  • 有安全漏洞
  • 可读性差 ::: image.png

    1.配置JSON转换器

    1.在publisher和consumer两个服务中都引入jackson依赖

    <dependency>
      <groupId>com.fasterxml.jackson.dataformat</groupId>
      <artifactId>jackson-dataformat-xml</artifactId>
      <version>2.9.10</version>
    </dependency>
    

    2.添加一个jsonMessageConverter的Bean即可

    @Bean
    public MessageConverter jsonMessageConverter(){
      return new Jackson2JsonMessageConverter();
    }