同步调用的优点:

  • 时效性较强,可以立即得到结果

    同步调用的问题:

  • 耦合度高

  • 性能下降
  • 额外资源消耗
  • 级联失败

    什么是MQ

  • 中午是消息队列,存放消息的队列。事件驱动架构中的Broker

    介绍消息队列的作用?

    好处:

  • 吞吐量提升:无需等待订阅者处理完成,响应更快速

  • 故障隔离:服务没有直接调用,不存在级联失败问题
  • 调用间没有阻塞,不会造成无效的资源占用
  • 耦合度极低,每个服务都可以灵活插拔,可替换
  • 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件

缺点:

  • 架构复杂了,业务没有明显的流程线,不好管理
  • 需要依赖于Broker的可靠、安全、性能

    MQ的应用场景

  • 提供应用与应用间消息通信的软件

    • 异步处理
    • 应用解耦
    • 流量削峰
    • 消息通知
  • 缺点

    • 系统可用性降低
    • 系统复制性增加

      常见的mq产品

      image.png

      什么是AMQP协议模型

  • 是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求

    rabbitmq支持哪些消息模式

  • 简单模式

  • 工作队列模式
  • 发布订阅模式
  • 路由模式
  • 通配符模式

    如何使用RabbitMQ收发消息?(基于SpringAMQP)?

  • 发消息

    • 父工程中引入依赖

      1. <!--AMQP依赖,包含RabbitMQ-->
      2. <dependency>
      3. <groupId>org.springframework.boot</groupId>
      4. <artifactId>spring-boot-starter-amqp</artifactId>
      5. </dependency>
    • 在publisher服务的application.yml中添加配置

      spring:
      rabbitmq:
      host: 192.168.150.101 # 主机名
      port: 5672 # 端口
      virtual-host: / # 虚拟主机
      username: itcast # 用户名
      password: 123321 # 密码
      
    • 注入RabbitTemplate发送消息

  • 收消息

    • 父工程中引入依赖

      <!--AMQP依赖,包含RabbitMQ-->
      <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
      </dependency>
      
    • 在consumer服务的application.yml中添加配置

      spring:
      rabbitmq:
      host: 192.168.150.101 # 主机名
      port: 5672 # 端口
      virtual-host: / # 虚拟主机
      username: itcast # 用户名
      password: 123321 # 密码
      
    • 注入RabbitTemplate发送消息

      如何项目中声明交换机、队列及绑定关系?

  • 创建配置类config添加@Configuration+@Bean ```yaml 展开/收起节点 聚焦  全部评论0

 发表

@Configuration public class FanoutConfig { /**

 * 声明交换机
 * @return Fanout类型交换机
 */
@Bean
public FanoutExchange fanoutExchange(){
    return new FanoutExchange("itcast.fanout");
}
/**
 * 第1个队列
 */
@Bean
public Queue fanoutQueue1(){
    return new Queue("fanout.queue1");
}
/**
 * 绑定队列和交换机
 */
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
    return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
 * 第2个队列
 */
@Bean
public Queue fanoutQueue2(){
    return new Queue("fanout.queue2");
}
/**
 * 绑定队列和交换机
 */
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
    return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}

}


- @RabbitListener中通过注解声明
```yaml
@RabbitListener(bindings = @QueueBinding(
            //消息队列
            value = @Queue(name = "topic.queue1"),
            //交换机队列
            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
            //通配符
            key = "china.#"
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            //消息队列
            value = @Queue(name = "topic.queue2"),
            //交换机队列
            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
            //通配符
            key = "#.news"
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
    }

消息转换器的作用?

  • 让消息体积变小
  • 提高安全性
  • 提高性能

    如何修改MQ默认的消息转换器?

  • 在publisher和consumer两个服务中都引入依赖

    <dependency>
      <groupId>com.fasterxml.jackson.dataformat</groupId>
      <artifactId>jackson-dataformat-xml</artifactId>
      <version>2.9.10</version>
    </dependency>
    
  • 在启动类中添加一个Bean

    @Bean
    public MessageConverter jsonMessageConverter(){
      return new Jackson2JsonMessageConverter();
    }
    

    MQ高级:

    rabbitmq如何保证消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列

    • 常见的丢失原因包括
      • 发送时丢失
        • 生产者发送的消息未送达exchange
        • 消息到达exchange后未到达queue
      • MQ宕机,queue将消息丢失
      • consumer接收到消息后未消费就宕机
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
    • 交换机持久化
    • 队列持久化
    • 消息持久化
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
    • 三种确认模式
      • manual:手动ack,需要在业务代码结束后,调用api发送ack
      • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
      • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

    • 含三种不同的实现
      • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
      • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
      • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

        rabbitmq如何解决百万消息堆积问题?

  • 解决消息堆积有两种思路

    • 队列上绑定多个消费者,提高消费速度
    • 使用惰性队列,可以再mq中保存更多消息
  • 惰性队列的优点有哪些
    • 基于磁盘存储,消息上限高
    • 没有间歇性的page-out,性能比较稳定
  • 惰性队列的缺点有哪些

    • 基于磁盘存储,消息时效性会降低
    • 性能受限于磁盘的IO
      // 惰性队列
      @Bean
      public Queue lazyQueue(){
      return QueueBuilder.durable("lazy.queue")
             .lazy()
             .build();
      }
      // 普通队列
      @Bean
      public Queue normalQueue(){
      return QueueBuilder.durable("normal.queue")
             .build();
      }
      

      rabbitmq如何防止消息重复消费?

  • 利用本地重试,而不是无限制的requeue到mq队列

  • 在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理

    rabbitmq如何保证高可用?

  • MQ集群

    • 普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力
    • 镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性
    • 仲裁队列:RabbitMQ的3.8版本以后,推出了新的功能,来代替镜像集群,底层采用Raft协议确保主从的数据一致性

      rabbitmq消息的重试机制?

  • 本地重试

    • 修改consumer服务的application.yml文件,添加内容

      spring:
      rabbitmq:
      listener:
       simple:
         retry:
           enabled: true # 开启消费者失败重试
           initial-interval: 1000ms # 初识的失败等待时长为1秒
           multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
           max-attempts: 3 # 最大重试次数
           stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
      
    • 结论

      • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
      • 重试达到最大次数后,Spring会返回ack,消息会被丢弃
  • 失败策略

    • 在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现
      • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
      • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
      • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

        rabbitmq如何实现延迟消费?

  • 死信交换机

    • 消息被消费者reject或者返回nack
    • 消息超时未消费
    • 队列满了
  • TTL 接收超时死信的死信交换机
    • 给队列设置ttl属性,进入队列后超过ttl时间的消息变为死信
    • 给消息设置ttl属性,队列接收到消息超过ttl时间后变为死信
  • 延迟队列

    • 延迟队列的使用场景包括
      • 延迟发送短信
      • 用户下单,如果用户在15 分钟内未支付,则自动取消
      • 预约工作会议,20分钟后自动通知所有参会人员
    • 使用DelayExchange插件

      • DelayExchange需要将一个交换机声明为delayed类型,流程如下
        • 接收消息
        • 判断消息是否具备x-delay属性
        • 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间
        • 返回routing not found结果给消息发送者
        • x-delay时间到期后,重新投递消息到指定队列
      • 使用DelayExchange

        • 声明DelayExchange交换机

          • 基于注解方式(推荐)

            @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue", durable = "true"),
            exchange = @Exchange(name = "delay.direct",delayed = "true"),
            key = "delay"
            ))
            public void listenDelayedQueue(String msg){
            log.info("接收到 delay.queue的延迟消息:{}", msg);
            }
            
          • 也可以基于@Bean的方式

image.png

     - 发送消息
@Test
public void testDelayedMsg() {
    // 创建消息
    Message message = MessageBuilder
            .withBody("hello, delay message".getBytes(StandardCharsets.UTF_8))
            .setHeader("x-delay",10000)
            .build();
    // 消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 发送消息
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
    log.debug("发送消息成功");
}

rabbitmq在项目中的使用场景?