MQ(实用篇-RabbitMQ)

同步通讯和异步通讯(like 电话vs微信)

同步调用优点:

  1. 时效性高

同步调用问题:

  1. 耦合度高
  2. 性能下降
  3. 资源浪费
  4. 级联失败

异步调用常见实现是事件驱动模式
优点:

  1. 耦合度低
  2. 吞吐量提升
  3. 故障隔离
  4. 流量削峰

缺点:

  1. 依赖Broker可靠性、安全性、吞吐能力
  2. 架构复杂,业务无明显流程线,不好追踪管理

    RabbitMQ

    虚拟机+CentOS7

    概念

  • channel:操作MQ工具
  • exchange:路由消息到队列
  • queue:缓存消息
  • virtual host:虚拟主机,对queue、exchange等资源的逻辑分组

常见消息模型

  • 基本消息队列
  • 工作消息队列
  • 发布订阅

基本消息队列发送流程:

  1. 建立connection
  2. 创建channel
  3. 利用channel声明队列
  4. 利用channel向队列发送消息

基本消息队列接收流程:

  1. 建立connection
  2. 创建channel
  3. 利用channel声明队列
  4. 定义consumer的消费行为handleDelivery()
  5. 利用channel将消费者与队列绑定

    SpringAMQP

    作用:大大简化消息发送接收api
    What’s AMQP?
    Advanced Message Queuing Protocol,应用程序或之间传递业务消息的开放标准。
    Spring AMQP?
    基于AMQP协议定义的一套API规范,提供模板发送和接收消息。

消息发送流程

  1. 引入spring-amqp依赖
  2. 配置RabbitMQ地址
  3. 利用RabblitTemplate的convertAndSend方法

    消息接收流程

  4. 引入spring-amqp依赖

  5. 配置RabbitMQ地址
  6. 定义类,添加@Component注解
  7. 类中声明方法,添加@RabbitListener注解,方法参数就是消息

WorkQueue

(一个队列绑定多个消费者)
基本思路:

  1. 在publisher服务定义测试方法,每秒产生50条消息,发送到simple.queue
  2. 在consumer服务定义两个监听者,都监听simlple.queue
  3. 消费者1每秒处理50条消息,消费者2每秒处理10条消息

消费预取限制:修改application.yml文件,设置preFetch值,控制上限

发布订阅模型介绍

允许将同意消息发送给多个消费者,实现是加入exchange(交换机)。

ps:RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储,下面介绍其三种Exchange模式。

FanoutExchange

作用:将消息路由到每个绑定的队列,称为广播模式
实现思路

  1. 在consumer服务中,声明队列、交换机,绑定两者
  2. 在consumer服务中,编写两个消费者方法,监听fanout.q1,fanout.q2
  3. 在publisher中编写测试方法,向itcast.fanout发送消息

    DirectExchange

    作用:将接收到的消息根据规则指定到Queue,称为路由模式
  • 每个Queue与Exchange设置一个BindingKey
  • 发布者发送消息,指定消息的RoutingKey
  • Exchange对BindingKey和Routing进行配对

实现思路

  1. 利用@RabbitListenter声明Exchange、Queue、RoutingKey
  2. 在cousumer服务中,编写两个消费方法,分别监听direct.queue1和direct.queue2
  3. 在publisher中编写测试方法,向itcast.direct发送消息
  1. @RabbitListener(bindings = @QueueBinding(
  2. value = @Queue(name = "direct.queue1"),
  3. exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
  4. key = {"red", "blue"}
  5. ))

TopicExchange

与DirectExchange类似,区别是routingKey必须是多个单词的列表,以.分割
可以使用通配符
#:代指0或多个单词
*:代指一个单词

消息转换器

作用:JSON序列化
步骤

  1. 引入依赖
  2. 在publisher/consumer服务声明MessageConverter