1. RabbitMQ

  1. 使用 Erlang 语言开发的开源消息队列系统,基于 AMQP 协议来实现,AMQP 主要特征是面向消息、队列、路由(包括点对点和发布 / 订阅)、可靠性、安全,该协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次;

2. AMQP 高级消息队列协议

  1. AMQPAdvanced Message Queuing Protocol),具有现代特征的二进制协议,是一个提供统一消息服务的应用层标准高级消息队列协议,应用层协议的一个开放标准,为面向消息的中间件设计;

2.1 协议模型

image.png

2.2 核心概念

  • Server:又称 Broker,接受客户端的连接,实现 AMQP 实体服务;
  • Connection:连接,应用程序与 Broker 的网络连接;
  • Channel:网络信道,几乎所有的操作都在 Channel 中进行,Channel 是进行消息读写的通道;客户端可建立多个 Channel,每个 Channel 代表一个会话任务;
  • Message:消息,服务器和应用程序之间传送的数据,由 Properties 和 Body 组成;Properties 可以对消息进行修饰,比如消息的优先级、延迟等高级特效;Body 则是消息体内容;
  • Virtual Host:虚拟地址,由于进行逻辑隔离,最上层的消息路由;一个 Virtual Host 里面可以有若干个 Exchange 和 Queue,同一个 Virtual Host 里面不能有相同名称的 Exchange 和 Queue;
  • Exchange:交换机,接收消息,根据路由键转发消息到绑定队列;
  • Binding:Exchange 和 Queue 之间的虚拟连接,Bingding 中可以包含 Routing Key;
  • Routing Key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息;
  • Queue:Message Queue,消息队列,保存消息并将它们转发给消费者;

3. RabbitMQ 消息流转

RabbitMQ (一) 概述 - 图2

  • 生产者提供 Routing Key 和绑定 Exchange,并投递消息到 Exchange 中;
  • 消费者只需从关联的 Queue 中消费消息;

4. Exchange 交换机

RabbitMQ (一) 概述 - 图3

  • Name:交换机名称;
  • Type:交换机类型 direct、topic、fanout、headers;
  • Durability:是否需要持久化;
  • Auto Delete:当最后一个绑定到 Exchange 上的队列删除后,自动删除 Exchange;
  • Internal:当前 Exchange 是否用于 RabbitMQ 内部使用,默认为 false;
  • Arguments:扩展参数,用于扩展 AMQP 协议自制化使用;

4.1 Direct 模式

  • 所有发送到 Direct Exchange 的消息被转发到 RouteKey 中指定的 Queue;
  • 注意:Direct 模式可以使用 RabbitMQ 自带的 Exchange:default Exchange,所以不需要将 Exchange 进行任何绑定(binding)操作,消息传递时,RouteKey 必须完全匹配才会被队列接收,否则该消息会被抛弃;

4.2 Topic 模式

  • 所有发送到 Topic Exchange 的消息被转发到所有关心 RouteKey 中指定 Topic 的 Queue 上;
  • Exchange 将 RouteKey 和某 Topic 进行模糊匹配,此时队列需要绑定一个 Topic;
  • 注意:可以使用通配符进行模糊匹配(”#” 匹配一个或多个词,”*” 匹配不多不少一个词);

4.3 Fanout 模式

  • 不处理路由键,只需要简单的将队列绑定到交换机上;
  • 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上;
  • Fanout 交换机转发消息是最快的,没有路由匹配;

4.4 Headers 模式

  1. Exchange 不依赖于 RouteKey BindingKey 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配;

5. Binding 绑定

  • Exchange 和 Exchange、Queue 之间的连接关系;
  • Binding 中可以包含 RouteKey 或者参数;

6. Queue 消息队列

  • 消息队列,实际存储消息数据;
  • Durability:是否持久化,Durable:是,Transient:否;
  • Auto delete:yes,代表当最后一个监听被移除之后,该 Queue 会自动被删除;

7. Message 消息

  1. 服务器和应用程序之间传送的数据,本质上就是一段数据,由 Properties PayloadBody)组成;<br /> 常用属性:delivery modeheaders(自定义属性);<br /> 其他属性:
  • content_type、content_encoding、priority;
  • correlation_id、reply_to、expiration、message_id;
  • timestamp、type、user_id、app_id、cluster_id;

8. Virtual Host 虚拟主机

  • 虚拟地址,用于进行逻辑隔离,最上层的消息路由;
  • 一个 Virtual Host 里面可以有若干个 Exchange 和 Queue;
  • 同一个 Virtual Host 里面不能有相同名称的 Exchange 和 Queue;

9. RabbitMQ 高级特性

9.1 消息 100% 投递保障

  1. 生产端可靠性投递:
  • 保障消息的成功发出;
  • 保障 MQ 节点的成功接收;
  • 发送端收到 MQ 节点(Broker)确认应答;
  • 完善的消息进行补偿机制;

    解决方案:

  • 方案一:消息落库,对消息状态进行打标;
    RabbitMQ (一) 概述 - 图4

  • 方案二:消息的延迟投递,做二次确认,回调检查;(减少一次数据库操作,适合高并发)
    RabbitMQ (一) 概述 - 图5

9.2 消息重复消费 - 幂等性保障

  1. 消费端实现幂等性,就意味着,消息永远不会消费多次,即使收到多条相同的消息;
  2. 解决方案:
  • 方案一:唯一 ID + 指纹码机制,利用数据库主键去重;
    • SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一 ID + 指纹码
    • 好处:实现简单;
    • 坏处:高并发下有数据库写入的性能瓶颈;
    • 优化:跟进 ID 进行分库分表进行算法路由;
  • 方案二:利用 Redis 原子性实现;
    • 考虑的问题1:是否进行数据落库,如果落库,关键解决的问题是数据库和缓存如何做到原子性?
    • 考虑的问题2:如果不进行落库,都存储到缓存中,如何设置定时同步策略?

9.3 Confirm 确认消息机制

  • 消息的确认,指生产者投递消息后,如果 Broker 收到消息,则会给生产者一个应答;
  • 生产者进行接收应答,用来确定这条消息是否正常的发送到 Broker,这种方式也是消息的可靠性投递的核心保障!

    实现 Confirm 确认消息:

  1. 在 Channel 上开启确认模式:
    channel.confirmSelect();
  2. 在 Channel 上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理;

9.4 Return 消息机制

  • Return Listener 用于处理一些不可路由的消息;
  • 某些情况下,如果发送消息时,当前的 Exchange 不存在或者指定的 RouteKey 路由不到,这时需要通过 Return Listener 监听这种不可达的消息;

    实现 Return 消息机制:

  1. Mandatory,如果 true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果 false,那么 Broker 端自动删除该消息;
  2. 在 Channel 上添加监听:addReturnListener,监听返回结果,根据具体的结果对消息进行后续处理;

9.5 消费端自定义监听

  1. 继承 DefaultConsumer,实现 handleDelivery 方法;

9.6 消费端限流

  1. 现象:RabbitMQ 服务器堆积上万条未处理的消息,消费端启动后,巨量的消息瞬间全部推送过来,无法同时处理这么多数据;
  2. 解决方案:
  • RabbitMQ 提供了一种 qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 Consume 或者 Channel 设置 Qos 的值)未被确认前,不进行消费新的消息;
  • void BasicQos(uint prefetchSize,ushort prefetchCount,bool global);(prefetchSize:0;prefetchCount:告诉 RabbitMQ 不要同时给一个消费者推送多于 N 个消息,即一旦有 N 个消息还没有 ack,则该 Consumer 将 Block 掉,直到有消息 ack;global:true/false 是否将上面设置应用于 Channel,即上面限制是 Channel 级别还是 Consumer 级别;)
  • 注意:prefetchSize 和 global,RabbitMQ 没有实现;prefetchCount 在 no_ack=false 情况下生效,即在自动应答的情况下不生效,必须设置为手工签收(channel.basicAck(deliveryTag, false));

9.7 消费端 ACK 与重回队列机制

  1. 消费端的手工 ACK NACK(需要关闭自动应答):
  • 消费端进行消费时,如果由于业务异常可以进行日志的记录,然后进行补偿;
  • 如果由于服务器宕机等严重问题,就需要手工进行 ACK 保障消费端消费成功;

    消费端的重回队列(需要在手工签收下进行 Nack 才能使用):

  • 消费端重回队列时为了对没有处理成功的消息,把消息重新回递给 Broker;

  • 一般在实际应用中,会关闭重置队列;

9.8 TTL 队列 / 消息

  • TTL:Time To Live,生存时间;
  • RabbitMQ 支持消息的过期时间,在消息发送时可以进行指定;
  • RabbitMQ 支持队列的过期时间,从消息入队列开始计算,只要超过队列的超时时间配置,所有消息会自动的清除;

9.9 死信队列

  • DLX,Dead-Letter-Exchange;利用 DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新 publish 到另一个 Exchange,这个 Exchange 就是 DLX;
  • DLX 也是一个正常的 Exchange,和一般的 Exchange 没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性;
  • 当这个队列中有死信时,RabbitMQ 就会自动的将这个消息重新发布到设置的 Exchange 上去,进而被路由到另一个队列;
  • 可以监听这个队列中消息做相应的处理,这个特性可以弥补 RabbitMQ 3.0 以前支持的 immediate 参数的功能;

    消息变成死信的情况:

  • 消息被拒绝(basic.reject / basic.nack)并且 requeue = false;

  • 消息 TTL 过期;
  • 队列达到最大长度;

    死信队列设置:

  1. 首先需要设置死信队列的 Exchange 和 Queue,然后进行绑定(Exchange:dlx.exchange;Queue:dlx.queue;RoutingKey:#);
  2. 然后进行正常声明交换机、队列、绑定,只不过需要在队列加上一个参数即可:arguments.put(“x-dead-letter-exchange”,”dlx.exchange”);
  3. 这样消息在过期、requeue、队列在达到最大长度时,消息就可以直接路由到死信队列;

10. RabbitMQ 集群架构

10.1 主备模式

  • 实现 RabbitMQ 的高可用集群,一般在并发和数据量不高的情况下,这种模型非常好且简单;主备模式也称之为 Warren 模式;
  • 主 / 备方案(主节点如果挂了,从节点提供服务而已,和 ActiveMQ 利用 Zookeeper 做主备一样);

RabbitMQ (一) 概述 - 图6

10.2 远程模式

  1. 可以实现双活的一种模式,简称 Shovel 模式,所谓 Shovel 就是可以把消息进行不同数据中心的复制工作,远距离通信和复制,可以跨地域的让两个 MQ 集群互联;

RabbitMQ (一) 概述 - 图7

RabbitMQ (一) 概述 - 图8

10.3 镜像模式

  1. 集群模式非常经典的就是 Mirror 镜像模式,保证 100% 数据不丢失,在实际工作中也是用的最多的,并且实现集群非常的简单,一般互联网大厂都会构建这种镜像集群模式;
  2. Mirror 镜像队列,目的是为了保证 RabbitMQ 数据的高可靠性解决方案,主要是实现数据的同步,一般来讲是 2-3 个节点实现数据同步(对于100%数据可靠性解决方案一般是3节点);

RabbitMQ (一) 概述 - 图9

10.4 多活模式

  1. 实现异地数据复制的主流模式,因为 Shovel 模式配置比较复杂,所以一般来说实现异地集群都是使用这种双活或多活模式来取代实现的;这种模型需要依赖 RabbitMQ federation 插件,可以实现持续的可靠的 AMQP 数据通信,多活模式在实际配置与应用非常简单;
  2. RabbitMQ 部署架构采用双中心模式(多中心),那么在两套(或多套)数据中心各部署一套 RabbitMQ 集群,各中心的 RabbitMQ 服务除了需要为业务提供正常的消息服务外,中心之间还需要实现部分队列消息共享;

RabbitMQ (一) 概述 - 图10

11. RabbitMQ 知识点

11.1 使用消息队列的好处

  • 解耦,系统 A 在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦;
  • 异步,将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度;
  • 削峰,并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常;

11.2 使用消息队列会的缺点

  • 系统可用性降低:消息队列挂了,系统无法保证可用;
  • 系统复杂性增加:要多考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输等;

11.3 消息基于什么传输?

  • 由于TCP连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈;
  • RabbitMQ 使用信道的方式来传输数据,信道是建立在真实的TCP连接内的虚拟连接,且每条TCP连接上的信道数量没有限制;

11.4 如何解决丢数据?

生产者丢数据:

  • 生产者的消息没有投递到 MQ 中怎么办?从生产者弄丢数据这个角度来看,RabbitMQ 提供 transaction 和 confirm 模式来确保生产者不丢消息;
  • transaction 机制就是说,发送消息前,开启事物 (channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚 (channel.txRollback()),如果发送成功则提交事物(channel.txCommit());
  • 缺点就是吞吐量下降,因此,生产上用 confirm 模式的居多,一旦 channel 进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从1开始),一旦消息被投递到所有匹配的队列之后, RabbitMQ 就会发送一个 Ack 给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果 RabbitMQ 没能处理该消息,则会发送一个 Nack 消息给你,就可以进行重试操作;

消息队列丢数据:

  • 处理消息队列丢数据的情况,一般是开启持久化磁盘的配置,这个持久化配置可以和 confirm 机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个 Ack 信号,这样,如果消息持久化磁盘之前, RabbitMQ 阵亡了,那么生产者收不到 Ack 信号,生产者会自动重发;
  • 如何持久化:
    1. 将 Queue 的持久化标识 durable 设置为 true,则代表是一个持久的队列;
    2. 发送消息的时候将 deliveryMode=2;
  • RabbitMQ 就算挂了,重启后也能恢复数据,在消息还没有持久化到硬盘时,可能服务已经死掉,这种情况可以通过引入 mirrored-queue 即镜像队列,但也不能保证消息百分百不丢失(整个集群都挂掉);

消费者丢数据:
启用手动确认模式可以解决这个问题:

  • 自动确认模式:消费者挂掉,待 Ack 的消息回归到队列中,消费者抛出异常,消息会不断的被重发,直到处理成功,不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让消息不断重试;
  • 手动确认模式:如果消费者来不及处理就死掉时,没有响应 Ack 时会重复发送一条信息给其他消费者,如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,然后一直抛异常,如果对异常进行了捕获,但是没有在 finally 里 Ack,也会一直重复发送消息(重试机制);
  • 不确认模式:acknowledge=”none” 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发;

11.5 如何确保消息接收方消费消息?

  1. 接收方消息确认机制:消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作),只有消费者确认了消息,RabbitMQ 才能安全地把消息从队列中删除,这里并没有用到超时机制,RabbitMQ 仅通过 Consumer 的连接中断来确认是否需要重新发送消息,也就是说,只要连接不中断,RabbitMQ Consumer 足够长的时间来处理消息;
  2. 下面罗列几种特殊情况:
  • 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被分发,然后重新分发给下一个订阅的消费者(可能存在消息重复消费的隐患,需要根据 bizId 去重);
  • 如果消费者接收到消息却没有确认消息,连接也未断开,则 RabbitMQ 认为该消费者繁忙,将不会给该消费者分发更多的消息;

    11.6 如何避免消息重复投递或重复消费?

  • 在消息生产时,RabbitMQ 内部针对每条生产者发送的消息生成一个 inner-msg-id,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列;

  • 在消息消费时,要求消息体中必须要有一个 bizId(对于同一业务全局唯一,如支付 ID、订单 ID、帖子 ID 等)作为去重和幂等的依据,避免同一条消息被重复消费;

    这个问题针对业务场景来回答,分以下几点:

  • 比如,拿这个消息做数据库的 insert 操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据;

  • 再比如,拿这个消息做 Redis 的 set 操作,不用解决,因为无论 set 几次结果都是一样的,set 操作本来就算幂等操作;
  • 如果上面两种情况还不行,上大招,准备一个第三方介质来做消费记录,以 Redis 为例,给消息分配一个全局 id,只要消费过该消息,将 以 K-V 形式写入 Redis,那消费者开始消费前,先去 Redis 中查询有没消费记录即可;