场景
一个系统,由生产者和消费者系统两个环节组成,生产者系统会负责不停的把消息写入到 RocketMQ 中去,然后消费者系统就是负责从 RocketMQ 里消费消息。
这个系统在生产环境是有高峰和低谷的,在晚上几个小时的高峰期内,大概就会有 100 多万条消息进入 RocketMQ。然后消费者系统从 RocketMQ 里获取到消息之后,会依赖一些 NoSQL 数据库去进行一些业务逻辑的实现。
然后有一天晚上就出现了一个问题,消费者系统依赖的 NoSQL 数据库就挂掉了,导致消费者系统自己也没法运作了,此时就没法继续从 RocketMQ 里消费数据和处理了,消费者系统几乎就处于停滞不动的状态。
然后生产者系统在晚上几个小时的高峰期内,就往 MQ 里写入了 100 多万的消息,此时都积压在 MQ 里了,根本没人消费和处理。
解决方案
丢弃全部消息
针对这种紧急的线上事故,一般来说有几种方案可以快速搞定他,如果这些消息你是允许丢失的,那么此时你就可以紧急修改消费者系统的代码,在代码里对所有的消息都获取到就直接丢弃,不做任何的处理,这样可以迅速的让积压在MQ里的百万消息被处理掉,只不过处理方式就是全部丢弃而已。
紧急扩容
但是往往对很多系统而言,不能简单粗暴的丢弃这些消息,所以最常见的办法,还是先等待消费者系统底层依赖的 NoSQL 数据库先恢复了,恢复之后,就可以根据你的线上 Topic 的 MessageQueue 的数量来看看如何后续处理。
增加消费者系统
假如你的 Topic 有 20 个 MessageQueue,然后你只有 4 个消费者系统在消费,那么每个消费者系统会从 4 个MessageQueue里获取消息,所以此时如果你仅仅依靠4个消费者系统是肯定不够的,毕竟MQ里积压了百万消息了。
所以此时你可以临时申请 16 台机器多部署 16 个消费者系统的实例,然后 20 个消费者系统同时消费,每个人消费一个 MessageQueue 的消息,此时你会发现你消费的速度提高了 5 倍,很快积压的百万消息都会被处理完毕。
但是这里你同时要考虑到你的消费者系统底层依赖的 NoSQL 数据库必须要能抗住临时增加了 5 倍的读写压力,因为原来就 4 个消费者系统在读写NoSQL,现在临时变成了20个消费者系统了。
当你处理完百万积压的消息之后,就可以下线多余的16台机器了。
这是一个最最常见的处理百万消息积压的办法,大体思路跟石杉老师在《互联网Java工程师面试突击(第一季)》里讲解的方案是一样的,只不过这里细化到根据 RocketMQ 的技术原理来讲解。
新建 Topic 分发
那么如果你的 Topic 总共就只有 4 个MessageQueue,然后你就只有 4 个消费者系统呢?
这个时候就没办法扩容消费者系统了,因为你加再多的消费者系统,还是只有 4 个 MessageQueue,没法并行消费。
所以此时往往是临时修改那 4 个消费者系统的代码,让他们获取到消息然后不写入 NoSQL,而是直接把消息写入一个新的 Topic,这个速度是很快的,因为仅仅是读写 MQ 而已。
然后新的 Topic 有 20 个 MessageQueue,然后再部署 20 台临时增加的消费者系统,去消费新的 Topic 后写入数据到 NoSQL 里去,这样子也可以迅速的增加消费者系统的并行处理能力,使用一个新的Topic来允许更多的消费者系统并行处理。
总结
处理线下消息堆积问题
- 先定位到消费者系统的问题,确保其消费速度恢复
- 解决消息堆积问题
- 如果消息允许丢失,则修改消费者的代码,使得消费者获取到消息后直接丢弃
- 如果消息不允许丢失
- 如果消息 Topic 对应的 MessageQueue 较多而消费者 consumer 较少时,此时可以临时增加消费者系统的数量来加快消费的速率
- 如果消息 Topic 对应的 MessageQueue 较少时,不管消费者数量多少都无法加快消费速率,此时可以增加一个新的 Topic 来调大 MessageQueue 的数量,修改原消费者系统代码将消息写入新的 Topic