异步通信-RabbitMQ

1、面临问题

  1. 之前的请求都是同步调用虽然时效性较强,可以立即得到结果
  2. 但是同步调用存在以下问题:
  3. 1.耦合度高
  4. 2.性能和吞吐能力下降
  5. 3.有额外的资源消耗
  6. 4.有级联失败问题

2、解决方案-RabbitMQ

2.1、简介

  1. MQ全称为Message Queue,即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。
  2. 官网:https://www.rabbitmq.com/
  3. 参考:https://blog.csdn.net/kavito/article/details/91403659

2.2、主流产品比较

image.png

2.3、优缺点

【优点】

  1. 1.吞吐量提升:无需等待订阅者处理完成,响应更快速
  2. 2.故障隔离:服务没有直接调用,不存在级联失败问题
  3. 3.调用间没有阻塞,不会造成无效的资源占用
  4. 4.耦合度极低,每个服务都可以灵活插拔,可替换
  5. 5.流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件

【缺点】

  1. 1.架构复杂了,业务没有明显的流程线,不好管理
  2. 2.需要依赖于Broker(MQ)的可靠、安全、性能

2.4、运行原理

image.png

2.5、运行流程

【生产者发送消息流程】

  1. 1、生产者和Broker建立TCP连接。
  2. 2、生产者和Broker建立通道。
  3. 3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
  4. 4Exchange将消息转发到指定的Queue(队列)

【消费者接收消息流程】

  1. 1、消费者和Broker建立TCP连接
  2. 2、消费者和Broker建立通道
  3. 3、消费者监听指定的Queue(队列)
  4. 4、当有消息到达QueueBroker默认将消息推送给消费者。
  5. 5、消费者接收到消息。
  6. 6ack回复

3、使用步骤

3.1、导入依赖

  1. <!--AMQP依赖,包含RabbitMQ-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

3.2、配置

    spring:
      rabbitmq:
        host: 192.168.248.222 # 主机名
        port: 5672 # 端口
        virtual-host: / # 虚拟主机
        username: itcast # 用户名
        password: 123321 # 密码

3.3、BasicQueue 简单队列模型

    生产者----消费者

【publisher】
    @Autowired
    private RabbitTemplate rabbitTemplate;    

    @Test
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }

【consumer】
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException         {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }

3.4、WorkQueue 工作模式

    生产者----消费者1|消费者2

【publisher】
    /**
         * workQueue
         * 向队列中不停发送消息,模拟消息堆积。
         */
    @Test
    public void testWorkQueue() throws InterruptedException {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, message_";
        for (int i = 0; i < 50; i++) {
            // 发送消息
            rabbitTemplate.convertAndSend(queueName, message + i);
            Thread.sleep(20);
        }
    }

【consumer】
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(20);
    }

    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(200);
    }

【yml】
    spring:
      rabbitmq:
        listener:
          simple:
            prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

3.5、发布/订阅模式

① Fanout-扇出/广播模式

@Configuration
public class FanoutConfig {

    // 声明交换机 Fanout类型交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }

    // 第1个队列
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    // 绑定队列和交换机
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    // 第1个队列
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    // 绑定队列和交换机
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

【publisher】
    // fanout
    @Test
    public void testFanoutExchange() {
        // 队列名称
        String exchangeName = "itcast.fanout";
        // 消息
        String message = "hello, everyone!";
        rabbitTemplate.convertAndSend(exchangeName, "", message);
    }

【consumer】
    // fanout
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) {
        System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) {
        System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
    }

② Direct-路由模式

【publisher】
     // direct
    @Test
    public void testSendDirectExchange() {
        // 交换机名称
        String exchangeName = "itcast.direct";
        // 消息
        String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "red", message);
    }

【consumer】
    // direct
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"}
    ))
    public void listenDirectQueue1(String msg) {
        System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "yellow"}
    ))
    public void listenDirectQueue2(String msg) {
        System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
    }

③ Topic-主题模式

【publisher】
    // topic
    @Test
    public void testSendTopicExchange() {
        // 交换机名称
        String exchangeName = "itcast.topic";
        // 消息
        String message = "喜报!孙悟空大战哥斯拉,胜!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
    }

【consumer】
    // topic
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void listenTopicQueue1(String msg) {
        System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
            key = "#.news"
    ))
    public void listenTopicQueue2(String msg) {
        System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
    }

3.6、消息转换器

① 消息是对象

    问题:当消息传递是对象时,会调用jdk的序列化,内存空间占用大

【publisher】
    // 测试jdk序列化
    @Test
    public void testSendMap() throws InterruptedException {
        // 准备消息
        Map<String,Object> msg = new HashMap<>();
        msg.put("name", "Jack");
        msg.put("age", 21);
        // 发送消息
        // messageConverter.toMessage(msg, msg);
        rabbitTemplate.convertAndSend("simple.queue","", msg);
    }

【consumer】
    // 测试jdk序列化
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(Map msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }

② 解决

    <!--json转换-->
    <dependency>
        <groupId>com.fasterxml.jackson.dataformat</groupId>
        <artifactId>jackson-dataformat-xml</artifactId>
        <version>2.9.10</version>
    </dependency>
    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }