MQ

1.1.同步和异步通讯

  1. 同步调用的优点:- 时效性较强,可以立即得到结果
  2. 同步调用的问题:
  3. - 耦合度高
  4. - 性能和吞吐能力下降
  5. - 有额外的资源消耗
  6. - 有级联失败问题
  1. 异步通信
  2. 好处:
  3. - 吞吐量提升:无需等待订阅者处理完成,响应更快速
  4. - 故障隔离:服务没有直接调用,不存在级联失败问题
  5. - 调用间没有阻塞,不会造成无效的资源占用
  6. - 耦合度极低,每个服务都可以灵活插拔,可替换
  7. - 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件
  8. 缺点:
  9. - 架构复杂了,业务没有明显的流程线,不好管理
  10. - 需要依赖于Broker的可靠、安全、性能

2.常见的MQ技术


RabbitMQ
ActiveMQ RocketMQ Kafka
公司/社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议
可用性 一般
单机吞吐量 一般 非常高
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性 一般 一般

基本消息队列的消息发送流程:

  1. 1. 建立connection
  2. 2. 创建channel
  3. 3. 利用channel声明队列
  4. 4. 利用channel向队列发送消息
  5. 基本消息队列的消息接收流程:
  6. 1. 建立connection
  7. 2. 创建channel
  8. 3. 利用channel声明队列
  9. 4. 定义consumer的消费行为handleDelivery()
  10. 5. 利用channel将消费者与队列绑定

3.SpringAMQP

  1. SpringAMQP提供了三个功能:
  2. - 自动声明队列、交换机及其绑定关系
  3. - 基于注解的监听器模式,异步接收消息
  4. - 封装了RabbitTemplate工具,用于发送消息
  1. 1.引入依赖
  2. 2.yml中添加配置
  3. 3.然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:
  4. 4.消息接收
  5. 首先配置MQ地址,在consumer服务的application.yml中添加配置:

总结
Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量

    3.3.发布/订阅

    ```java
  • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • Consumer:消费者,与以前一样,订阅队列,没有变化
  • Queue:消息队列也与以前一样,接收消息、缓存消息。

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

  1. <a name="qusfk"></a>
  2. ## 3.4.Fanout
  3. ```java
  4. 在广播模式下,消息发送流程是这样的:
  5. - 1) 可以有多个队列
  6. - 2) 每个队列都要绑定到Exchange(交换机)
  7. - 3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
  8. - 4) 交换机把消息发送给绑定过的所有队列
  9. - 5) 订阅队列的消费者都能拿到消息
  10. 我们的计划是这样的:
  11. - 创建一个交换机 itcast.fanout,类型是Fanout
  12. - 创建两个队列fanout.queue1和fanout.queue2,绑定到交换机itcast.fanout
总结
交换机的作用是什么?

- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
- FanoutExchange的会将消息路由到每个绑定的队列

声明队列、交换机、绑定关系的Bean是什么?

- Queue
- FanoutExchange
- Binding