同步通讯和异步通讯

同步通讯,比如打电话,利用feign组件远程调用其他服务
优点:
•时效性较强,可以立即得到结果
缺点
•耦合度高
•性能和吞吐能力下降
•有额外的资源消耗
•有级联失败问题

异步通信,发送方与接收方之间间多了一个中间件
好处:

  • 吞吐量提升:无需等待订阅者处理完成,响应更快速
  • 故障隔离:服务没有直接调用,不存在级联失败问题
  • 调用间没有阻塞,不会造成无效的资源占用
  • 耦合度极低,每个服务都可以灵活插拔,可替换
  • 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件

缺点:

  • 架构复杂了,业务没有明显的流程线,不好管理
  • 需要依赖于Broker的可靠、安全、性能

MQ的基本介绍
MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。是提供应用于应用间消息通信的软件。

快速安装 mq
1.1在线拉取 docker pull rabbitma:3.8-management
1.2本地上传mq.tar镜像包
加载 docker load -i mq.tar
2.执行安装命令
创建运行MQ容器,并创建用户和密码
docker run \
> -e RABBITMQ_DEFAULT_USER=itcast \
> -e RABBITMQ_DEFAULT_PASS=123321 \
> -v mq-plugins:/plugins \
> —name mq \
> —hostname mq \
> -p 15672:15672 \
> -p 5672:5672 \
> -d \
> rabbitmq:3.8-management
3.进入主页:http://192.168.200.130:15672/

RabbitMQ中的一些角色:

  • publisher:生产者
  • consumer:消费者
  • channel:操作MQ的工具:消息生产者在建立连接的时候connection对象里有多个channel 信道对象(了解)
  • exchange:交换机,负责消息路由到队列中
  • queue:队列,存储消息
  • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离
  • AMQP协议:高级消息队列传输协议
  • channel

image.png
看不看无所谓,凑字数的,springAMQP封装了这些玩意
基本消息队列的消息发送流程:
1.建立connection
2.创建channel
3.利用channel声明队列
4.利用channel向队列发送消息
基本消息队列的消息接收流程:
1.建立connection
2.创建channel
3.利用channel声明队列
4.定义consumer的消费行为handleDelivery()
5.利用channel将消费者与队列绑定

什么是SpringAMQP?
AMQP 是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。(应用间消息通信的一种协议)
Spring AMQP 是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
•Basic Queue 简单队列模型 一生产者 一消费者
SpringAMQP如何发送消息?
•引入amqp的starter依赖
•配置RabbitMQ地址
•利用RabbitTemplate的convertAndSend方法

SpringAMQP如何接收消息?
•引入amqp的starter依赖
•配置RabbitMQ地址
•定义类,添加@Component注解
•类中声明方法,添加@RabbitListener注解,方法参数就时消息
注意:消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能

•Work Queue 工作队列模型 一生产者 多消费者
1.编写两个消费者,都监听simple.queue
模拟延迟一个 Thread.sleep(25) 一个Thread.sleep(100)
2.先开启消费者启动类
3.生产者循环发送50条消息到队列
结果延迟小的先接受完消息,平均每个消费者接受25条消息
image.png
优化方案
修改consumer服务的application.yml文件,添加配置
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
能者多劳
image.png
•多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
•通过设置prefetch来控制消费者预取的消息数量

发布订阅模式允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)
常见exchange类型包括:

  • Fanout:广播,将消息交给所有绑定到交换机的队列
  • Direct:定向,把消息交给符合指定routing key 的队列
  • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队

交换机的作用是什么?

•接收publisher发送的消息
•将消息按照规则路由到与之绑定的队列
•不能缓存消息,路由失败,消息丢失
•FanoutExchange的会将消息路由到每个绑定的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

声明队列、交换机、绑定关系的Bean是什么?
•Queue
•FanoutExchange
•Binding
发布订阅模式
Fanout模式(将消息路由给每一个与之绑定的队列)

FanoutExchange 会将接收到的消息广播到每一个跟其绑定的queue

1.写一个配置类(在consumer服务声明Exchange、Queue、Binding)
image.png
编写两个消费者
启动生产者测试类向队列中发送消息
结果两个消费者都获得了同一条消息

Direct模型(根据RoutingKey判断路由给哪个队列)
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明

1.消费者image.png
2.生产者
发送消息的时候多了一个 routingKey属性
image.png

Topic 模型

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割
  • Topic交换机与队列绑定时的bindingKey可以指定通配符
  • :代表0个或多个词

  • *:代表1个词

消息转换器

默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差

解决方法(优化)
配置JSON转换器
在publisher和consumer两个服务中都引入依赖:或者直接在父工程里引入依赖

com.fasterxml.jackson.dataformat
jackson-dataformat-xml
2.9.10

在publisher和consumer两个服务 的 启动类中添加一个Bean即可:

@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}

默认的jdk序列化器
image.png
json序列化器
image.png
思考面试题:
1.什么是MQ?
MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。是提供应用于应用间消息通信的软件。

2.MQ的应用场景?

应用解耦(异步)

系统之间进行数据交互的时候,在时效性和稳定性之间我们都需要进行选择。基于线程的异步处理,能确保用户体验,但是极端情况下可能会出现异常,影响系统的稳定性,而同步调用很多时候无法保证理想的性能,那么我们就可以用MQ来进行处理。

通知

这里就用到了前文一个重要的特点,发布订阅,下游系统一直在监听MQ的数据,如果MQ有数据,下游系统则会按照 先进先出 这样的规则, 逐条进行消费 ,而上游系统只需要将数据存入MQ里,这样就既降低了不同系统之间的耦合度,同时也确保了消息通知的及时性,而且也不影响上游系统的性能。

限流

上文有说了一个非常重要的特性,MQ 数据是只有一条数据在使用中。 在很多存在并发,而又对数据一致性要求高,而且对性能要求也高的场景,如何保证,那么MQ就能起这个作用了。不管多少流量进来,MQ都会让你遵守规则,排除处理,不会因为其他原因,导致并发的问题,而出现很多意想不到脏数据。

数据分发

MQ的发布订阅肯定不是只是简单的一对一,一个上游和一个下游的关系,MQ中间件基本都是支持一对多或者广播的模式,而且都可以根据规则选择分发的对象。这样上游的一份数据,众多下游系统中,可以根据规则选择是否接收这些数据,这样扩展性就很强了。 PS:上文中的上游和下游,在MQ更多的是叫做生产者(producer)和消费者(consumer)。

分布式事务

解决分布式事务有一个比较容易理解的方案,就是二次提交。基于MQ的特点,MQ作为二次提交的中间节点,负责存储请求数据,在失败的情况可以进行多次尝试,或者基于MQ中的队列数据进行回滚操作,是一个既能保证性能,又能保证业务一致性的方案,当然,这个方案的主要问题就是定制化较多,有一定的开发工作量

3.常见的mq产品?
Kafka、ActiveMQ、RabbitMQ、RocketMQ。
image.png
4.什么是AMQP协议模型?
是一个进程间传递异步消息网络协议

5.rabbitmq支持哪些消息模式?
Basic Queue 简单队列模型 一个消费者绑定一个队列和一个生产者
WorkQueue 任务模型 让多个消费者绑定到一个队列,共同消费队列中的消息,同一条消息只能让一个消费者接受,默认平均分配,可以设置预获取消息的数量
Fanout 和WorkQueue不同,中间多了一个交换机,用来接收消息并分发给队列,建立绑定关系 同一条消息可以让多个消费者接受
Direct 和Fanout不同,需要绑定key,通过key来判断是否传递
Topic 与Direct相比,key中多了通配符# 或 *

6.如何使用RabbitMQ收发消息?(基于SpringAMQP)?
引入amqp依赖
配置MQ地址
自动装配 RabbitTemplate 对象 调用 converAndSend方法 发送消息
在消费者服务中创建一个类
@RabbitListener声明消费者
运行 消费者启动类接收消息
7。如何项目中声明交换机、队列及绑定关系?

  1. 新建一个配置类

    @Bean 声明
    image.png
    2.基于注解直接在@RabbitListener注解中声明绑定关系

8.消息转换器的作用?
实现SpringAMQP中消息的序列化和反序列化
Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象

9.如何修改MQ默认的消息转换器?
引入json序列化器依赖(生产者和消费者需要配置相同的序列化器,否则程序会报错)
启动类中@Bean 将序列化器对象添加到Spring容器