MQ(实用篇-RabbitMQ)
同步通讯和异步通讯(like 电话vs微信)
同步调用优点:
- 时效性高
同步调用问题:
- 耦合度高
- 性能下降
- 资源浪费
- 级联失败
异步调用常见实现是事件驱动模式
优点:
- 耦合度低
- 吞吐量提升
- 故障隔离
- 流量削峰
缺点:
- channel:操作MQ工具
- exchange:路由消息到队列
- queue:缓存消息
- virtual host:虚拟主机,对queue、exchange等资源的逻辑分组
常见消息模型
- 基本消息队列
- 工作消息队列
- 发布订阅
基本消息队列发送流程:
- 建立connection
- 创建channel
- 利用channel声明队列
- 利用channel向队列发送消息
基本消息队列接收流程:
- 建立connection
- 创建channel
- 利用channel声明队列
- 定义consumer的消费行为handleDelivery()
- 利用channel将消费者与队列绑定
SpringAMQP
作用:大大简化消息发送接收api
What’s AMQP?
Advanced Message Queuing Protocol,应用程序或之间传递业务消息的开放标准。
Spring AMQP?
基于AMQP协议定义的一套API规范,提供模板发送和接收消息。
消息发送流程
- 引入spring-amqp依赖
- 配置RabbitMQ地址
利用RabblitTemplate的convertAndSend方法
消息接收流程
引入spring-amqp依赖
- 配置RabbitMQ地址
- 定义类,添加@Component注解
- 类中声明方法,添加@RabbitListener注解,方法参数就是消息
WorkQueue
(一个队列绑定多个消费者)
基本思路:
- 在publisher服务定义测试方法,每秒产生50条消息,发送到simple.queue
- 在consumer服务定义两个监听者,都监听simlple.queue
- 消费者1每秒处理50条消息,消费者2每秒处理10条消息
消费预取限制:修改application.yml文件,设置preFetch值,控制上限
发布订阅模型介绍
允许将同意消息发送给多个消费者,实现是加入exchange(交换机)。
ps:RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储,下面介绍其三种Exchange模式。
FanoutExchange
作用:将消息路由到每个绑定的队列,称为广播模式
实现思路
- 在consumer服务中,声明队列、交换机,绑定两者
- 在consumer服务中,编写两个消费者方法,监听fanout.q1,fanout.q2
- 在publisher中编写测试方法,向itcast.fanout发送消息
DirectExchange
作用:将接收到的消息根据规则指定到Queue,称为路由模式
- 每个Queue与Exchange设置一个BindingKey
- 发布者发送消息,指定消息的RoutingKey
- Exchange对BindingKey和Routing进行配对
实现思路
- 利用@RabbitListenter声明Exchange、Queue、RoutingKey
- 在cousumer服务中,编写两个消费方法,分别监听direct.queue1和direct.queue2
- 在publisher中编写测试方法,向itcast.direct发送消息
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
TopicExchange
与DirectExchange类似,区别是routingKey必须是多个单词的列表,以.分割
可以使用通配符
#:代指0或多个单词
*:代指一个单词
消息转换器
作用:JSON序列化
步骤