1、什么是 RabbitMQ

采用 AMQP 高级消息队列协议的一种消息队列技术,最大的特点就是消费并不需要确保提供方存在,实现了服务之间的高度解耦。

RabbitMQ 都有哪些组件构成?

  • Message:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括 routing-key(路由键)、 priority(相对于其他消息的优先权)、 delivery-mode(指出该消息可能需要持久性存储)等。
  • Publisher:消息的生产者,也是向交换机发布消息的客户端应用程序。
  • Broker:消息队列的服务器实体
  • Connection:网络连接,比如一个 TCP 连接。
  • Exchange:交换机 ,用来接收生产者发送的消息,并将这些消息路由给服务器中的队列。
  • Binding:绑定,用于消息队列和交换机之间的关联。
  • Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。
  • Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。
  • Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

交换机都有哪几种

直接交换机

  • 比如,现在有一个直接交换机,它绑定了3个队列,第一个叫 dog,第二个叫 dog.gurad,第三个叫 dog.puppy,如果说消息发送过来,我们用的路由键叫 dog,那它就会精确的只发送给 dog 队列,实现消息最终只能到达一个队列,这就叫直接类型交换机,也称为单播模式、点对点通信

广播类型交换机

  • 如果交换机下绑定了3个队列,消息一到达交换机,这3个队列都会收到, 这个消息会广播出去,根本就不关心路由键是什么,把所有消息都通过交换机广播给它绑定的所有队列,被称为广播模式

主题类型交换机

  • 虽然它也是广播模式,比如它绑定了几个交换机,但是它可以指定某些交换机来发送消息,其余没指定的,则不会收到消息,所以它是部分广播,主要是根据路由键匹配将消息发个队列,这就是主题-发布订阅模式

RabbitMQ 的工作流程

  1. 无论是生产者(Publisher)想要发消息(Message),还是消费者(Consumer)要接消息(Message),它们都必须跟 RabbitMQ 建立一条连接(Connection)
  2. 所有的收发数据都需要在连接(Connection)里面开辟信道(Channel)进行收发,想要收发的都是消息(Message),所以我们要构造一个消息(Message),消息有头有体,头相当于是对参数的一些设置、命令,体就是消息的真正内容,而消息里面最重要的一个就是路由键(routing-key)
  3. 我们将消息指定好路由键(routing-key)要发给谁以后,消息(Message)先来到消息代理(Broker)指定的一个虚拟主机(Virtual Host)里边, 由虚拟主机(Virtual Host)里边指定交换机(Exchange)
  4. 这就相当于我们要发消息(Message)的时候,我们还要指定好要发给哪个交换机(Exchange)
  5. 由指定的交换机(Exchange)收到消息以后,它根据我们指定的路由键(routing-key),通过交换机跟其它队列(Queue)的绑定关系,将这个消息放到哪个队列(Queue)
  6. 然后消费者(Consumer)就会监听这个队列,队列里面的内容就会被消费者(Consumer)实时拿到,当然也是通过信道(Channel)拿到的

消息基于什么传输

RabbitMQ 使用信道的方式来传输数据。信道是建立在真实的 TCP 连接内的虚拟连接,且每条 TCP 连接上的信道数量没有限制

说说你们项目里是怎么用消息队列的?

  • 我们有一个订单系统,订单系统会每次下一个新订单的时候,就会发送一条消息到 RabbitMQ 里面去,后台有一个库存系统,负责获取消息,会对库存进行锁定,直到用户付款成功,才会解锁并扣减库存,这个目的是对订单系统进行解耦,并且利用异步还提升了请求的响应速度。
  • 我们还在秒杀系统使用了消息队列,进行队列削峰

你的订单系统不发送消息到 MQ,直接调用库存系统的一个接口,也能直接调用成功,库存也能更新,根本不需要使用消息队列啊?

是,的确是可以,但是有一个问题,假设库存锁定成功,将结果返回到了订单服务,订单服务根据结果又调用了积分服务,让它扣减积分,结果积分服务内部出现异常,积分数据回滚,此时订单服务感知到积分服务抛出来的异常,订单数据也回滚了,但是库存服务不会有任何感知,最终结果就是:订单、积分的数据全部回滚了,结果库存一直在那锁定着。

为什么不使用 Redis 作为消息队列

因为redis 的消息推送是基于 发布&订阅 功能或者 list 结构的。这套系统不适合做消息队列。

  • 发布&订阅 断电就清空数据,并不会对数据进行持久化。因为消息一发布,就把消息都推送给订阅者了,然后进行删除操作,数据在内存保存的时间很短。
  • 如果使用 list 做消息队列, rdb 或者 aof 会进行持久化,但是同样也会丢一点数据
  • redis 发布订阅不支持负载均衡。功能类似与 udp,所有监控这个 topic 消息的服务器都会接收到这个消息。而别的 mq,只有这个组的其中一台机器会接收,可以做负载均衡。
  • 消息传递不可靠,不能保证消息一定发布成功了。这个功能只能通过代码来实现。

    为什么使用消息队列?

为了解决分布式事务的问题,我们当时了解了常见的几种解决方案,2PC、TCC 事务补偿方案、最大努力通知方案

2PC

  • 就是二阶段提交协议,遵循 ACID 这个原则的事务,是强一致性的,里面有一个总事务管理器,多个本地资源管理器,本地资源管理器就是每个服务的事务管理器,总事务管理器将整个分布式事务拆成了两个阶段,第一个阶段,它会询问每个本地资源管理器,是否准备好提交了,然后这些本地资源管理器就会检查它们当前的这些数据有没有准备好、连接正不正常、能不能提交数据,一旦都准备好了,就会告诉总事务管理器,总事务管理器收到消息后就会让他们提交,然后就成功了,但是如果在询问阶段,有一个小事务说自己不能提交,那事务管理器知道之后,就会要求所有人回滚
  • 这种解决方案的代表有 Seata,但是在使用的时候,我们发现 Seata 在执行过程中,首先要获取全局锁、还有其它各种锁,再加上后面的隔离也要使用各种锁机制,这样的话,相当于做一个事务,它要加非常多的锁才行,一加锁以后,相当于把并发变成串行化了,如果在订单系统里面用的话,相当于一个用户下单之后,所有人都要等他下单成功之后才能继续下单,这样系统就没法用了。

3PC

  • 当然还有 3PC 模式,这个模式将整个预备阶段又分成了两个阶段,总事务管理器先询问各个本地资源管理器,数据能否提交,本地资源管理器回答能提交;总事务管理器就让本地资源管理器来准备这些数据,开始提交。本地资源管理器就准备这些数据来提交;然后最后一个阶段,只要本地资源管理器都告诉总事务管理器准备好了, 它就让所有本地资源管理器提交
  • 这种方式跟 2PC 模式没有什么太大的差别,所以我们也没考虑

柔性事务 - TCC 事务补偿方案

  • 它遵循 BASE 理论,是实现最终一致性的。它将事务分为了三个阶段:第一阶段是 Try,尝试阶段,就是我们预准备一些数据;接下来第二阶段就是 Confirm,就是把我们准备的数据提交;第三阶段就是 Cancel,想要回滚我们提交的数据。
  • TCC 相当于把这些 3PC 搞的东西都给整成手动的,全部都是手动调用,所以我们也没考虑。

柔性事务 - 最大努力通知方案

  • 假设我们现在有一个订单服务、 还有一个库存服务、还有一个大业务,大业务现在调了订单和库存,结果大业务在自己这失败了。然后它就尽最大的努力发消息给 MQ,每隔几秒一发,让订单服务与库存服务监听,一旦这两个服务监听到了消息,就分别去解锁库存、解锁订单,虽然不会确保强一致性,但是保证了最终一致性。

消息队列都有什么优缺点?

优点就是在特定场景下使用,有其对应的好处,比如:解耦、异步、削峰。

缺点有以下几个:

  • 会使系统可用性降低
  • 提高系统复杂度
  • 会产生一致性问题

你们为什么会选择 RabbitMQ 而不是其他 MQ 呢?

首先我们考虑这么几点,吞吐量,我们一开始的业务成交量没有那么大,所以就没有选用像 Kafka、RocketMQ 这种单机吞吐量较大的 MQ,再基于技术的后续维护、消息的可靠性,尤其是 Rabbit MQ 的时效性,它是微秒级的,基于这几个方面来说,Rabbit MQ 比较 ActiveMQ 而言,都有比较明显的优势,所以我们选择了 RabbitMQ。

如何保证消息队列的高可用?

我们采用镜像集群模式,在镜像集群模式下,创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,也就是说,每个 RabbitMQ 节点都有这个 queue 的完整镜像,里面有 queue 的全部数据。然后每次写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。

这样的话,任何一个机器宕机了,都没事,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。

当然这种模式也有一些缺点:

  1. 这个性能开销比较大,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!
  2. 没有扩展可言,RabbitMQ 是集群,不是分布式的,所以当某个Queue负载过重,我们并不能通过新增节点来缓解压力,因为所以节点上的数据都是相同的,这样就没办法进行扩展了

对于镜像集群而言,当某个queue负载过重,可能会导致集群雪崩,不过也是有解决办法的

我们可以使用 HA 的 exactly 策略来减少集群雪崩,它是这样操作的,镜像队列将会在集群上复制 count份。如果集群数量少于 count 时候,队列会复制到所有节点上。如果大于 count 集群,有一个节点crash 后,新进入的节点也不会做新的镜像。

如何开启镜像集群模式呢?

就是在 RabbitMQ 的管理控制台新增一个策略,这个策略是镜像集群模式的策略,指定的时候可以要求数据同步到所有节点,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。

为什么不采用普通集群模式呢?

普通集群模式,只是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。消费的时候,如果连接到了另外一个实例,那个实例才会从 queue 所在实例上拉取数据过来。

这种方式很麻烦,也不怎么好,根本就没做到所谓的分布式,就是个普通集群。它会导致要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈

而且如果那个放 queue 的实例宕机了,会导致接下来其他实例无法从那个实例拉取,如果开启了消息持久化,让 RabbitMQ 落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个 queue 拉取数据。

所以这就比较尴尬了,这个模式下的 RabbitMQ 根本就没有什么所谓的高可用性这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。

如何保证消息不被重复消费?如何保证消息消费时的幂等性?

以我们项目里订单系统的下订单、扣库存为例

  1. 消费者的业务消费接口应该设计为幂等性的。比如扣库存的时候,检查库存工作单的状态
  2. 建立防重表(redis/mysql),每个消息状态是否都被服务器收到都应该记录,处理过就不用处理
  3. RabbitMQ 的每一个消息都有redelivered字段,可以获取是否是被重新投递过来的,而不是第一次投递过来的

如何保证消息的可靠性传输,要是消息丢失了怎么办?

这个问题分为好几种情况,我来解释一下

生产者弄丢了数据

  • 生产者将数据发送到 RabbitMQ 的时候,因为网络问题啥的,数据在半路丢了,都有可能。
  • 有两种处理方式:使用 RabbitMQ 事务机制,或者开启 confirm 模式,我们一般会选择开启 confirm 模式,因为事务是同步的,提交一个事务之后,系统会阻塞在那,会导致系统的吞吐量下降,而 confirm 模式是异步的,发送完一个消息之后,可以紧接着发下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。

Rabbit MQ 弄丢了数据

  • 这种情况只要开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。

消费者弄丢了数据

  • 一定开启手动ACK,消费成功再移除,失败或者没来得及处理就 noAck并重新入队

如何保证消息的顺序性?

一个 queue 多个 consumer ,生产者向 RabbitMQ 里发送了三条数据,顺序依次是 data1/data2/data3,压入的是 RabbitMQ 的一个内存队列。有三个消费者分别从 MQ 中消费这三条数据中的一条,结果消费者 2 先执行完操作,把 data2 存入数据库,然后是 data1/data3。这样消息就乱了。

解决:拆分成多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点;或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。

大量消息在 MQ 里积压了几个小时了还没解决

几千万条数据在 MQ 里积压了七八个小时,从下午 4 点多,积压到了晚上 11 点多。这个是我们真实遇到过的一个场景,确实是线上故障了,这个时候要不然就是修复 consumer 的问题,让它恢复消费速度,然后傻傻的等待几个小时消费完毕。这个肯定不能在面试的时候说吧。

一个消费者一秒是 1000 条,一秒 3 个消费者是 3000 条,一分钟就是 18 万条。所以如果你积压了几百万到上千万的数据,即使消费者恢复了,也需要大概 1 小时的时间才能恢复过来。

一般这个时候,只能临时紧急扩容了,具体操作步骤和思路如下:

  • 先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉。
  • 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
  • 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
  • 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
  • 等快速消费完积压数据之后,得恢复原先部署的架构重新用原先的 consumer 机器来消费消息。

MQ 中的消息过期失效了

假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢

这个情况下,就不是说要增加 consumer 消费积压的消息,因为实际上没啥积压,而是丢了大量的消息。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上 12 点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。

假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。

MQ 都快写满了

只能说方案一执行的太慢了,只能临时写程序,接入数据来消费了,消费一个丢弃一个,都不要了,然后晚上再补数据。

如果让你写一个消息队列,该如何进行架构设计,说一下你的思路?

  • 首先这个 mq 得支持可伸缩,就是需要的时候快速扩容,就可以增加吞吐量和容量,参照一下 kafka 的设计理念,broker -> topic -> partition,每个 partition 放一个机器,就存一部分数据。如果现在资源不够了,给 topic 增加 partition,然后做数据迁移,增加机器,那就可以存放更多数据,提供更高的吞吐量了?
  • 其次得让这个 mq 的数据要落地磁盘,落磁盘才能保证别进程挂了数据就丢了。那落磁盘的时候,采用顺序写的方式,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能是很高的,这就是 kafka 的思路。
  • 还有可用性,可以采用多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务。