1.mq的基本概念

:::tips MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。 :::

Rabbitmq核心概念与使用场景 - 图2

1.1 mq的优势

:::tips 1.应用解耦.一个系统包括订单服务,支付服务,库存服务,系统之间耦合性高,容错率低,可维护性低 ::: Rabbitmq核心概念与使用场景 - 图3

使用 MQ 使得应用间解耦,提升容错性和可维护性.一个服务的不可用,不会影响其他功能.如下

Rabbitmq核心概念与使用场景 - 图4

:::tips 2.异步提速 比如有一个很大的报文需要进行解析后入库,此时就可以将报文存入mq,异步进行消费,从而提高服务的响应能力. ::: 再比如下面:
Rabbitmq核心概念与使用场景 - 图5

引入mq后原本一个下单操作从720ms变为25ms,提高了响应速度,用户体验以及系统吞吐量
Rabbitmq核心概念与使用场景 - 图6

:::tips 3.削峰填谷 ,如果系统只支持1000个并发.但是一次性过来5000个,这就会导致服务故障 ::: Rabbitmq核心概念与使用场景 - 图7

引入mq后.所有的并发都经过mq.服务就每秒拿1000个请求.
Rabbitmq核心概念与使用场景 - 图8

使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。
使用MQ后,可以提高系统稳定性。
image.png :::danger 总结mq 优势包括 解耦,异步提速以及削峰填谷 :::

1.2 mq的劣势

  • 系统可用性降低 :::tips 系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用? ::: Rabbitmq核心概念与使用场景 - 图10

  • 系统复杂度提高 :::tips MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息不被丢失等情况? :::

    2.rabbitmq快速入门

    2.1 简介

    AMQP,即Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP规范发布。类比HTTP。

image.png

2.2 rabbitmq相关概念

Broker
接收和分发消息的应用,RabbitMQ Server就是 Message Broker
Virtual host
出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Connection
publisher/consumer 和 broker 之间的 TCP 连接
Channel
如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
Exchange
message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue
消息最终被送到这里等待 consumer 取走
Binding
exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

2.3.rabbitmq几种工作模式

:::tips RabbitMQ提供了7种工作模式:简单模式、work queues、Publish/Subscribe发布与订阅模式、Routing路由模式、Topics主题模式、RPC远程调用模式(远程调用,不太算MQ),消息确认模式。
官网对应模式介绍:https://www.rabbitmq.com/getstarted.html ::: image.png

image.png

image.png

3rabbitmq高级特性

  • 消费端限流
  • TTL
  • 死信队列
  • 延迟队列
  • 消息可靠性投递
  • Consumer ACK

    3.1 消息的可靠投递

    :::tips 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

  • confirm 确认模式

  • return 退回模式 ::: image.png :::tips rabbitmq 整个消息投递的路径为:

  • producer—->rabbitmq broker—->exchange—->queue—->consumer

  • 消息从 producer 到 exchange 则会返回一个 confirmCallback 。
  • 消息从 exchange—>queue 投递失败则会返回一个 returnCallback 。

我们将利用这两个 callback 控制消息的可靠性投递 ::: 消息的可靠投递小结 :::tips

  • 设置ConnectionFactory的publisher-confirms=”true” 开启 确认模式。
  • 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
  • 设置ConnectionFactory的publisher-returns=”true” 开启 退回模式。
  • 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。 :::

    3.2 Consumer Ack

    :::tips ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
    有三种确认方式:
    •自动确认:acknowledge=”none”
    •手动确认:acknowledge=”manual”
    •根据异常情况确认:acknowledge=”auto”
    其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息 ::: Consumer Ack 小结 :::tips

  • 在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认

  • 如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息
  • 如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。 :::

    3.3 可靠性消息总结

    :::tips 1.持久化
    •exchange要持久化
    •queue要持久化
    •message要持久化
    1.生产方确认Confirm
    2.消费方确认Ack
    3.Broker高可用 :::

    3.4 消费端限流

  • 中配置 prefetch属性设置消费端一次拉取多少消息

  • 消费端的确认模式一定为手动确认。acknowledge=”manual”

Rabbitmq核心概念与使用场景 - 图16

3.5 TTL

  • TTL 全称 Time To Live(存活时间/过期时间)。
  • 当消息到达存活时间后,还没有被消费,会被自动清除。
  • RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间

image.png

  • 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
  • 设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。
  • 如果两者都进行了设置,以时间短的为准。

    3.5 死信队列

    :::tips 死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。 ::: image.png

消息成为死信的三种情况:
1. 队列消息长度到达限制;
2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
3. 原队列存在消息过期设置,消息到达超时时间未被消费;
队列绑定死信交换机: :::tips 给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key ::: 注意:
1. 死信交换机和死信队列和普通的没有区别
2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
需求:

  1. 下单后,30分钟未支付,取消订单,回滚库存。
  2. 新用户注册成功7天后,发送短信问候。

实现方式:

  1. 定时器
  2. 延迟队列

很可惜,在RabbitMQ中并未提供延迟队列功能。
但是可以使用:TTL+死信队列 组合实现延迟队列的效果
image.png

消息幂等性保障

幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。

  • 消息幂等性保障—乐观锁机制
  • 唯一主键保证消费一次

    消息积压

    消费者宕机积压
    -消费者消费能力不足积压
    -发送者发流量太大
    解决方案:上线更多的消费者,进行正常消费上线专门的队列消费服务,将消息先批量取出来,记录数据库,再慢慢处理

    rabbitmq微服务实战

    springboot整合rabbitmq