为什么使用MQ,哪些地方使用了MQ?

  1. 登录验证的时候,向用户发送邮件这个事件我是利用mq来实现了消息的异步处理,在点击发送事件之后,生产者这边启动计时器,等待验证码接收,如果规定时间内还未收到消息,则可以重新发送。
  2. 更新热点数据时,用于解决redis缓存和MySQL数据库的一致性问题。
    1. 缓存一致性方案采用的是:对于热点数据设置定时任务,当数据修改的时候采用延迟双删,先更新数据,然后删除
    2. 先更新数据库,后删除缓存。为了保证删除缓存这个事件的可靠性,利用消息队列来实现延迟双删,保证数据最终一致性。

      解耦

      场景:假设对于热点数据,多个服务之间需要消费这个热点数据(可以是根据博客的点赞点踩,做一些定制服务和推送服务),如果是直接让A系统发送给不同的系统BCD,那么会使BCD系统与A系统之间紧耦合,代码难维护。
      mq-1.png
      A 系统要时时刻刻考虑 BCDE 四个系统如果挂了该咋办?要不要重发,要不要把消息存起来?头发都白了啊!

      使用MQ可以解决

      如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。
      mq-2.png

异步

用于缓存延时双删,对于热点数据的缓存,不可能每次对热点数据修改都同步的更新数据库,大大降低了性能,可以建立一个定时任务,先更新数据库后删除缓存的策略,其中删除缓存的操作采用异步删除,将删除缓存的操作加入到消息队列中。

削峰

每天 0:00 到 12:00,A 系统风平浪静,每秒并发请求数量就 50 个。结果每次一到 12:00 ~ 13:00 ,每秒并发请求数量突然会暴增到 5k+ 条。但是系统是直接基于 MySQL 的,大量的请求涌入 MySQL,每秒钟对 MySQL 执行约 5k 条 SQL。
一般的 MySQL,扛到每秒 2k 个请求就差不多了,如果每秒请求到 5k 的话,可能就直接把 MySQL 给打死了,导致系统崩溃,用户也就没法再使用系统了。
但是高峰期一过,到了下午的时候,就成了低峰期,可能也就 1w 的用户同时在网站上操作,每秒中的请求数量可能也就 50 个请求,对整个系统几乎没有任何的压力。
项目场景部分 - 图4
如果使用 MQ,每秒 5k 个请求写入 MQ,A 系统每秒钟最多处理 2k 个请求,因为 MySQL 每秒钟最多处理 2k 个。A 系统从 MQ 中慢慢拉取请求,每秒钟就拉取 2k 个请求,不要超过自己每秒能处理的最大请求数量就 ok,这样下来,哪怕是高峰期的时候,A 系统也绝对不会挂掉。而 MQ 每秒钟 5k 个请求进来,就 2k 个请求出去,结果就导致在中午高峰期(1 个小时),可能有几十万甚至几百万的请求积压在 MQ 中。
项目场景部分 - 图5
这个短暂的高峰期积压是 ok 的,因为高峰期过了之后,每秒钟就 50 个请求进 MQ,但是 A 系统依然会按照每秒 2k 个请求的速度在处理。所以说,只要高峰期一过,A 系统就会快速将积压的消息给解决掉。

需要注意消息的挤压数,避免发生制定相应的消息挤压丢失策略。

消息队列的问题

可用性问题怎么解决?
系统复杂度?
消息重复消费怎么解决?
消息堆积怎么解决?
消息传输的可靠性怎么解决?
消息的顺序性?
消息丢失怎么解决?

消息队列的高可用

Redis做消息队列

Redis本身是通过主从复制,建立集群来保证高可用的。

RabbitMQ

RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以 RabbitMQ 为例子讲解第一种 MQ 的高可用性怎么实现。
RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。

单机模式

单机模式,就是 Demo 级别的,一般就是你本地启动了玩玩儿的,没人生产用单机模式。

普通集群模式(无高可用性)mq-7.png

这种方式确实很麻烦,也不怎么好,没做到所谓的分布式,就是个普通集群。因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈
而且如果那个放 queue 的实例宕机了,会导致接下来其他实例就无法从那个实例拉取,如果你开启了消息持久化,让 RabbitMQ 落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个 queue 拉取数据。
所以这个事儿就比较尴尬了,这就没有什么所谓的高可用性这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。

镜像集群模式(高可用性)

这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论是元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。
项目场景部分 - 图7
那么如何开启这个镜像集群模式呢?其实很简单,RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。
这样的话,好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!第二,这么玩儿,不是分布式的,就没有扩展性可言了,如果某个 queue 负载很重,你加机器,新增的机器也包含了这个 queue 的所有数据,并没有办法线性扩展你的 queue。你想,如果这个 queue 的数据量很大,大到这个机器上的容量无法容纳了,此时该怎么办呢?

Kafka 的高可用性

Kafka 一个最基本的架构认识:由多个 broker 组成,每个 broker 是一个节点;你创建一个 topic,这个 topic 可以划分为多个 partition,每个 partition 可以存在于不同的 broker 上,每个 partition 就放一部分数据。
这就是天然的分布式消息队列,就是说一个 topic 的数据,是分散放在多个机器上的,每个机器就放一部分数据
实际上 RabbitMQ 之类的,并不是分布式消息队列,它就是传统的消息队列,只不过提供了一些集群、HA(High Availability, 高可用性) 的机制而已,因为无论怎么玩儿,RabbitMQ 一个 queue 的数据都是放在一个节点里的,镜像集群模式下,也是每个节点都放这个 queue 的完整数据。
Kafka 0.8 以前,是没有 HA 机制的,就是任何一个 broker 宕机了,那个 broker 上的 partition 就废了,没法写也没法读,没有什么高可用性可言。
比如说,我们假设创建了一个 topic,指定其 partition 数量是 3 个,分别在三台机器上。但是,如果第二台机器宕机了,会导致这个 topic 的 1/3 的数据就丢了,因此这个是做不到高可用的。
项目场景部分 - 图8
Kafka 0.8 以后,提供了 HA 机制,就是 replica(复制品) 副本机制。每个 partition 的数据都会同步到其它机器上,形成自己的多个 replica 副本。所有 replica 会选举一个 leader 出来,那么生产和消费都跟这个 leader 打交道,然后其他 replica 就是 follower。写的时候,leader 会负责把数据同步到所有 follower 上去,读的时候就直接读 leader 上的数据即可。只能读写 leader?很简单,要是你可以随意读写每个 follower,那么就要 care 数据一致性的问题,系统复杂度太高,很容易出问题。Kafka 会均匀地将一个 partition 的所有 replica 分布在不同的机器上,这样才可以提高容错性。
项目场景部分 - 图9
这么搞,就有所谓的高可用性了,因为如果某个 broker 宕机了,没事儿,那个 broker 上面的 partition 在其他机器上都有副本的。如果这个宕机的 broker 上面有某个 partition 的 leader,那么此时会从 follower 中重新选举一个新的 leader 出来,大家继续读写那个新的 leader 即可。这就有所谓的高可用性了。
写数据的时候,生产者就写 leader,然后 leader 将数据落地写本地磁盘,接着其他 follower 自己主动从 leader 来 pull 数据。一旦所有 follower 同步好数据了,就会发送 ack 给 leader,leader 收到所有 follower 的 ack 之后,就会返回写成功的消息给生产者。(当然,这只是其中一种模式,还可以适当调整这个行为)
消费的时候,只会从 leader 去读,但是只有当一个消息已经被所有 follower 都同步成功返回 ack 的时候,这个消息才会被消费者读到。
看到这里,相信你大致明白了 Kafka 是如何保证高可用机制的了,对吧?不至于一无所知,现场还能给面试官画画图。要是遇上面试官确实是 Kafka 高手,深挖了问,那你只能说不好意思,太深入的你没研究过。

保证消息不被重复消费

Redis
Stream流在添加数据的时候会给消息创建一个唯一id(生产消息当前的系统启动时间(毫秒级)+该毫秒内该消息的序号)。
当然具体怎么做需要开发者在消费端进行幂等性维护。

  1. 如果是set数据把最简单了,具有天然的幂等性
  2. 如果是普通数据,也可以通过维护一个唯一id来防止重复消费。

mq-11.png

保证消息的可靠传输

主要从三个方面探讨:生产者端导致数据不可靠,消息队列本身数据不可靠消息丢失,消费者端消息丢失。

生产者端导致数据不可靠

消息队列的处理方式都一样,消息重传,知道成功为止,当然会因为网络原因产生重复消息,这个时候就需要消费者端去做幂等性逻辑了。
总之,是可以保证生产者消息可靠的。

  1. redis

redis则是通过重试阻塞式读取消息xadd

  1. RabbitMQ

rabbitmq利用confirm或者事务来保证消息可靠

  1. Kafka

    Kafka因为分布式消息队列,是通过了acks=all ,一定不会丢,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才认为本次写成功了。

    消费者端消息可靠性

    消费者拿到消息后还没有处理完,消费者宕机问题,
    消费者处理完消息后,必须告知MQ,MQ收到ACK之后才会标记已处理,否则仍旧把这些数据发送给消费者。

  2. redis处理的逻辑:

XPENDING 和 XACK:XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息,而 XACK 命令用于向消息队列确认消息处理已完成。

  1. RabbitMQ处理逻辑

RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack ,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。

  1. Kafka处理逻辑

大家都知道 Kafka 会自动提交 offset,那么只要关闭自动提交 offset,在处理完之后自己手动提交 offset,就可以保证数据不会丢。但是此时确实还是可能会有重复消费,比如你刚处理完,还没提交 offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。

消息队列本身消息丢失(宕机)

  1. redis

根据主从复制或者采用集群策略,尽可能保证数据不丢失,但是因为redis持久化策略(RDB和AOF)的天生缺陷,会使得数据无法保证百分比不丢失

  1. RabbitMQ

哪怕是你给 RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。
所以,持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ack ,你也是可以自己重发的。

  1. Kafka

Kafka 某个 broker 宕机,然后重新选举 partition 的 leader

消息积压问题

如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?
揪其原因,问题还是出在消费者端,因为消费端消费的数据不够快次啊会存在消息积压的问题。
举个例子,消费端每次消费之后要写 mysql,结果 mysql 挂了,消费端 hang 那儿了,不动了;
(在我的项目中不存在这个问题,因为消费者是去删除缓存,而不是去操作数据库,且生产者的消息事件来源于更新数据库)。

大量消息积压怎么搞

  1. 先找到消费者端的故障问题,恢复消费者端的消费速度
  2. 开辟多个消费组,临时分发数据的consumer程序,这个程序部署上去消费积压数据
  3. 等快速消费完积压数据之后,恢复原先部署的架构,重新用原先的consumer机器来消费消息

MQ中的消息过期失效了怎么搞

假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢
这个情况下,就不是说要增加 consumer 消费积压的消息,因为实际上没啥积压,而是丢了大量的消息。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上 12 点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。