现在的互联网应用基本上都是采用分布式系统架构进行设计的,
而很多分布式系统必备的一个基础软件就是消息队列。

消息队列要能支持组件通信消息的快速读写,
而 Redis 本身支持数据的高速访问,正好可以满足消息队列的读写性能需求。

不过,除了性能,消息队列还有其他的要求,所以,很多人都很关心一个问题:“Redis 适合做消息队列吗?”
其实,这个问题的背后,隐含着两方面的核心问题:

  • 消息队列的消息存取需求是什么?
  • Redis 如何实现消息队列的需求?

这节课,我们就来聊一聊消息队列的特征和 Redis 提供的消息队列方案。
只有把这两方面的知识和实践经验串连起来,才能彻底理解基于 Redis 实现消息队列的技术实践。
以后当你需要为分布式系统组件做消息队列选型时,
就可以根据组件通信量和消息通信速度的要求,选择出适合的 Redis 消息队列方案了。


我先介绍一下消息队列存取消息的过程
在分布式系统中,当两个组件要基于消息队列进行通信时,
一个组件会把要处理的数据以消息的形式传递给消息队列,然后,这个组件就可以继续执行其他操作了;
远端的另一个组件从消息队列中把消息读取出来,再在本地进行处理。

为了方便你理解,我还是借助一个例子来解释一下。
假设组件 1 需要对采集到的数据进行求和计算,并写入数据库,
但是,消息到达的速度很快,组件 1 没有办法及时地既做采集,又做计算,并且写入数据库。
所以,我们可以使用基于消息队列的通信,让组件 1 把数据 x 和 y 保存为 JSON 格式的消息,再发到消息队列,这样它就可以继续接收新的数据了。
组件 2 则异步地从消息队列中把数据读取出来,在服务器 2 上进行求和计算后,再写入数据库。
这个过程如下图所示:
d79d46ec4aa22bf46fde3ae1a99fc2bc.webp
我们一般把消息队列中发送消息的组件称为生产者(例子中的组件 1),
把接收消息的组件称为消费者(例子中的组件 2),
f470bb957c1faff674c08b1fa65a3a62.webp
在使用消息队列时,消费者可以异步读取生产者消息,然后再进行处理。
这样一来,即使生产者发送消息的速度远远超过了消费者处理消息的速度,生产者已经发送的消息也可以缓存在消息队列中,避免阻塞生产者,这是消息队列作为分布式组件通信的一大优势。

不过,消息队列在存取消息时,必须要满足三个需求,分别是:消息保序、处理重复的消息 和 保证消息可靠性。

消息队列的消息存取需求

消息保序

虽然消费者是异步处理消息,
但是,消费者仍然需要按照生产者发送消息的顺序来处理消息,避免后发送的消息被先处理了。

对于要求消息保序的场景来说,一旦出现这种消息被乱序处理的情况,
就可能会导致业务逻辑被错误执行,从而给业务方造成损失。


我们来看一个更新商品库存的场景
假设生产者负责接收库存更新请求,消费者负责实际更新库存,现有库存量是 10。
生产者先后发送了消息 1 和消息 2,消息 1 要把商品 X 的库存记录更新为 5,消息 2 是把商品 X 库存更新为 3。
如果消息 1 和 2在消息队列中无法保序,出现消息 2 早于消息 1 被处理的情况,很显然,库存更新就出错了。
这是业务应用无法接受的。

面对这种情况,你可能会想到一种解决方案:
不要把更新后的库存量作为生产者发送的消息,而是把库存扣除值作为消息的内容。
这样一来,消息 1 是扣减库存量 5,消息 2 是扣减库存量 2。
如果消息 1 和消息 2 之间没有库存查询请求的话,
即使消费者先处理消息 2,再处理消息 1,这个方案也能够保证最终的库存量是正确的,也就是库存量为 3。

但是,我们还需要考虑这样一种情况:假如消费者收到了这样三条消息:消息 1 是扣减库存量 5,消息 2 是读取库存量,消息 3 是扣减库存量 2,此时,如果消费者先处理了消息 3,那么库存量就变成了8。
然后,消费者处理了消息 2,读取当前的库存量是 8,这就会出现库存量查询不正确的情况。

从业务应用层面看,消息 1、2、3 应该是顺序执行的,所以,消息 2 查询到的应该是扣减了 5 以后的库存量,而不是扣减了 2 以后的库存量。

所以,用库存扣除值作为消息的方案,在消息中同时包含读写操作的场景下,会带来数据读取错误的问题。
而且,用库存扣除值作为消息的方案还会面临一个问题,那就是重复消息处理。

重复消息处理

消费者从消息队列读取消息时,有时会因为网络堵塞而出现消息重传的情况。
此时,消费者可能会收到多条重复的消息。
对于重复的消息,消费者如果多次处理的话,就可能造成一个业务逻辑被多次执行,
如果业务逻辑正好是要修改数据,那就会出现数据被多次修改的问题了。

还是以库存更新为例,假设消费者收到了一次消息 1,要扣减库存量 5,然后又收到了一次消息 1,
那么,如果消费者无法识别这两条消息实际是一条相同消息的话,就会执行两次扣减库存量 5 的操作,
此时,库存量就不对了。这当然也是无法接受的。

消息可靠性保证

另外,消费者在处理消息的时候,还可能出现因为故障或宕机导致消息没有处理完成的情况。
此时,消息队列需要能提供消息可靠性的保证,
也就是说,当消费者重启后,可以重新读取消息再次进行处理,否则,就会出现消息漏处理的问题了。

Redis的 List 和 Streams 两种数据类型,就可以满足消息队列的这三个需求。

基于 List 的消息队列解决方案

我们先来了解下基于 List 的消息队列实现方法。

List 本身就是按先进先出的顺序对数据进行存取的,
所以,如果使用 List 作为消息队列保存消息的话,就已经能满足消息保序的需求了。
具体来说,生产者可以使用 LPUSH 命令把要发送的消息依次写入 List,
而消费者则可以使用 RPOP 命令,从 List 的另一端按照消息的写入顺序,依次读取消息并进行处理。
b0959216cbce7ac383ce206b8884777c.webp

不过,在消费者读取数据时,有一个潜在的性能风险点。
在生产者往 List 中写入数据时,List 并不会主动地通知消费者有新消息写入,
如果消费者想要及时处理消息,就需要在程序中不停地调用 RPOP 命令(比如使用一个 while(1) 循环)。
如果有新消息写入,RPOP 命令就会返回结果,否则,RPOP 命令返回空值,再继续循环。

所以,即使没有新消息写入 List,消费者也要不停地调用 RPOP 命令,这就会导致消费者程序的 CPU 一直消耗在执行 RPOP 命令上,带来不必要的性能损失。

为了解决这个问题,Redis 提供了 BRPOP 命令。
BRPOP 命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,
直到有新的数据写入队列,再开始读取新数据。
和消费者程序自己不停地调用 RPOP 命令相比,这种方式能节省 CPU 开销。


消息保序的问题解决了,接下来,我们还需要考虑解决重复消息处理的问题,
这里其实有一个要求:消费者程序本身能对重复消息进行判断。
一方面,消息队列要能给每一个消息提供全局唯一的 ID 号;
另一方面,消费者程序要把已经处理过的消息的 ID 号记录下来。

当收到一条消息后,消费者程序就可以对比收到的消息 ID 和记录的已处理过的消息 ID,
来判断当前收到的消息有没有经过处理。
如果已经处理过,消费者程序就不再进行处理了。这种处理特性也称为幂等性,幂等性就是指:对于同一条消息,消费者收到一次的处理结果和收到多次的处理结果是一致的。
不过,List 本身是不会为每个消息生成 ID 号的,
所以,消息的全局唯一 ID 号就需要生产者程序在发送消息前自行生成。
生成之后,我们在用 LPUSH 命令把消息插入 List 时,需要在消息中包含这个全局唯一 ID。

例如,我们执行以下命令,就把一条全局ID为 101030001、库存量为 5 的消息插入了消息队列:
LPUSH mq "101030001:stock:5"


最后,我们再来看下,List 类型是如何保证消息可靠性的。
当消费者程序从 List 中读取一条消息后,List 就不会再留存这条消息了。
所以,如果消费者程序在处理消息的过程出现了故障或宕机,就会导致消息没有处理完成,
那么,消费者程序再次启动后,就没办法再次从 List 中读取消息了。

为了留存消息,List 类型提供了 BRPOPLPUSH 命令,
这个命令的作用是:让消费者程序从一个 List 中读取消息,
同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存。
这样一来,如果消费者程序读了消息但没能正常处理,
等它重启后,就可以从备份 List 中重新读取消息并进行处理了。

5045395da08317b546aab7eb698d013d.webp
生产者先用 LPUSH 把消息“5”“3”插入到消息队列 mq 中。
消费者程序使用 BRPOPLPUSH 命令读取消息“5”,同时,消息“5”还会被 Redis 插入到 mqback 队列中。
如果消费者程序处理消息“5”时宕机了,等它重启后,可以从 mqback 中再次读取消息“5”,继续处理。

到这里,你可以看到,基于 List 类型,我们可以满足分布式组件对消息队列的三大需求。
但是,在用 List 做消息队列时,我们还可能遇到过一个问题:
生产者消息发送很快,而消费者处理消息的速度比较慢,
这就导致 List 中的消息越积越多,给 Redis 的内存带来很大压力
这个时候,我们希望启动多个消费者程序组成一个消费组,一起分担处理 List 中的消息。
但是,List 类型并不支持消费组的实现。
那么,还有没有更合适的解决方案呢?
这就要说到 Redis 从 5.0 版本开始提供的 Streams 数据类型了。
和 List 相比,Streams 同样能够满足消息队列的三大需求。
而且,Streams 还支持消费组形式的消息读取。

基于 Streams 的消息队列解决方案

接下来,我们就来了解下 Streams 的使用方法。

Streams 是 Redis 专门为消息队列设计的数据类型,它提供了丰富的消息队列操作命令。

  • XADD:插入消息,保证有序,可以自动生成全局唯一 ID
  • XREAD:用于读取消息,可以按 ID 读取数据
  • XREADGROUP:按消费组形式读取消息
  • XPENDING 和 XACK:

XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息,
XACK 命令用于向消息队列确认消息处理已完成。


首先,我们来学习下 Streams 类型存取消息的操作 XADD
XADD 命令可以往消息队列中插入新消息,消息的格式是:键 - 值对形式。
对于插入的每一条消息,Streams 可以自动为其生成一个全局唯一的 ID。

比如:我们执行下面的命令,往名称为 mqstream 的消息队列中插入一条消息,消息的键是 repo,值是 5。
其中,消息队列名称后面的 ,表示让 Redis 为插入的数据自动生成一个全局唯一的 ID。
当然,我们也可以不用
,直接在消息队列名称后自行设定一个 ID 号,只要保证这个 ID 号是全局唯一的就行。
不过,相比自行设定 ID 号,使用 会更加方便高效。
命令:`XADD mqstream
repo 5<br />返回结果:“1599203861727-0”`

可以看到,消息的全局唯一 ID 由两部分组成,第一部分“1599203861727”是数据插入时,以毫秒为单位计算的当前服务器时间,第二部分表示插入消息在当前毫秒内的消息序号,这是从 0 开始编号的。
例如,“1599203861727-0”就表示在“1599203861727”毫秒内的第 1 条消息。


当消费者需要读取消息时,可以直接使用 XREAD命令从消息队列中读取消息。
XREAD 在读取消息时,可以指定一个消息 ID,并从这个消息 ID 的下一条消息开始进行读取。
比如:我们可以执行下面的命令,从ID号为 1599203861727-0 的消息开始,
读取后续的所有消息(示例中一共 3 条)。
XREAD BLOCK 100 STREAMS mqstream 1599203861727-0
返回结果:

  1. 1) 1) "mqstream"
  2. 2) 1) 1) "1599274912765-0"
  3. 2) 1) "repo"
  4. 2) "3"
  5. 2) 1) "1599274925823-0"
  6. 2) 1) "repo"
  7. 2) "2"
  8. 3) 1) "1599274927910-0"
  9. 2) 1) "repo"
  10. 2) "1"

另外,消费者也可以在调用 XRAED 时设定 block 配置项,实现类似于 BRPOP 的阻塞读取操作。
当消息队列中没有消息时,一旦设置了 block 配置项,XREAD 就会阻塞,阻塞的时长在 block 配置项进行设置。

比如:下面的命令,其中,命令最后的“$”符号表示:读取最新的消息,
同时,我们设置了 block 10000 的配置项,10000 的单位是毫秒,表明 XREAD 在读取最新消息时,
如果没有消息到来,XREAD 将阻塞 10000 毫秒(即 10 秒),然后再返回。
下面命令中的 XREAD 执行后,消息队列 mqstream 中一直没有消息,
所以,XREAD 在 10 秒后返回空值(nil)。
XREAD block 10000 streams mqstream $


刚刚讲到的这些操作是 List 也支持的,接下来,我们再来学习下 Streams 特有的功能
Streams 本身可以使用 XGROUP 创建消费组,
创建消费组之后,Streams 可以使用 XREADGROUP 命令让消费组内的消费者读取消息,

比如:我们执行下面的命令,创建一个名为 group1 的消费组,这个消费组消费的消息队列是 mqstream。
XGROUP create mqstream group1 0
然后,我们再执行一段命令,让 group1 消费组里的消费者 consumer1 从 mqstream 中读取所有消息,
其中,命令最后的参数“>”,表示从第一条尚未被消费的消息开始读取。
因为在 consumer1 读取消息前,group1 中没有其他消费者读取过消息,
所以,consumer1 就得到 mqstream 消息队列中的所有消息了(一共4条)。
XREADGROUP group group1 consumer1 streams mqstream >

  1. 1) 1) "mqstream"
  2. 2) 1) 1) "1599203861727-0"
  3. 2) 1) "repo"
  4. 2) "5"
  5. 2) 1) "1599274912765-0"
  6. 2) 1) "repo"
  7. 2) "3"
  8. 3) 1) "1599274925823-0"
  9. 2) 1) "repo"
  10. 2) "2"
  11. 4) 1) "1599274927910-0"
  12. 2) 1) "repo"
  13. 2) "1"

需要注意的是,
消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了。
使用消费组的目的是:让组内的多个消费者共同分担读取消息,
所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。

比如:我们执行下列命令,让 group2 消费组中的 consumer1、2、3 消费者各自读取一条消息。

  1. XREADGROUP group group2 consumer1 count 1 streams mqstream >
  2. 1) 1) "mqstream"
  3. 2) 1) 1) "1599203861727-0"
  4. 2) 1) "repo"
  5. 2) "5"
  6. XREADGROUP group group2 consumer2 count 1 streams mqstream >
  7. 1) 1) "mqstream"
  8. 2) 1) 1) "1599274912765-0"
  9. 2) 1) "repo"
  10. 2) "3"
  11. XREADGROUP group group2 consumer3 count 1 streams mqstream >
  12. 1) 1) "mqstream"
  13. 2) 1) 1) "1599274925823-0"
  14. 2) 1) "repo"
  15. 2) "2"

为了保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息,
Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,
直到消费者使用 XAC K命令通知 Streams“消息已经处理完成”。
如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。
此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。

比如:我们来查看一下 group2 中各个消费者已读取、但尚未确认的消息个数。
其中,XPENDING 返回结果的第二、三行分别表示 group2 中所有消费者读取的消息最小 ID 和最大 ID。

  1. XPENDING mqstream group2
  2. 1) (integer) 3
  3. 2) "1599203861727-0"
  4. 3) "1599274925823-0"
  5. 4) 1) 1) "consumer1"
  6. 2) "1"
  7. 2) 1) "consumer2"
  8. 2) "1"
  9. 3) 1) "consumer3"
  10. 2) "1"

如果我们还需要进一步查看某个消费者具体读取了哪些数据,可以执行下面的命令:

  1. XPENDING mqstream group2 - + 10 consumer2
  2. 1) 1) "1599274912765-0"
  3. 2) "consumer2"
  4. 3) (integer) 513336
  5. 4) (integer) 1

可以看到,consumer2 已读取的消息的 ID 是 1599274912765-0。
一旦消息 1599274912765-0 被 consumer2 处理了,consumer2 就可以使用 XACK 命令通知 Streams,
然后这条消息就会被删除。
当我们再使用 XPENDING 命令查看时,就可以看到,consumer2 已经没有已读取、但尚未确认处理的消息了。

  1. XACK mqstream group2 1599274912765-0
  2. (integer) 1
  3. XPENDING mqstream group2 - + 10 consumer2
  4. (empty list or set)

现在,我们知道了用 Streams 实现消息队列的方法,
我还想再强调下,Streams 是 Redis 5.0 专门针对消息队列场景设计的数据类型,
如果你的 Redis 是 5.0 及以后的版本,就可以考虑把 Streams 用作消息队列了。

小结

这节课,我们学习了分布式系统组件使用消息队列时的三大需求:消息保序、重复消息处理和消息可靠性保证,
这三大需求可以进一步转换为对消息队列的三大要求:消息数据有序存取,消息数据具有全局唯一编号,以及消息数据在消费完成后被删除。

我画了一张表格,汇总了用 List 和 Streams 实现消息队列的特点和区别。


基于 List 基于 Streams
消息保序 使用 LPUSH / RPOP 使用 XADD / XREAD
阻塞读取 使用 BRPOP 使用 XREAD block
重复消息处理 生产者自行实现全局唯一 ID Streams自动生成全局唯一 ID
消息可靠性 使用 BRPOPLPUSH 使用 PENDING List 自动留存消息,
使用 XPENDING 查看,使用 XACK 确认消息
适用场景 适合 Redis5.0 前版本的部署环境
适合 消息总量小 的场景
适合 Redis5.0 及以后版本的部署环境
适合 消息总量大,需要消费组形式读取数据 的场景

关于 Redis 是否适合做消息队列,业界一直是有争论的。
很多人认为,
要使用消息队列,就应该采用 Kafka、RabbitMQ 这些专门面向消息队列场景的软件,
而 Redis 更加适合做缓存。

根据这些年做 Redis 研发工作的经验,我的看法是:
Redis 是一个非常轻量级的键值数据库,
部署一个 Redis 实例就是启动一个进程,部署 Redis 集群,也就是部署多个 Redis 实例。
而 Kafka、RabbitMQ 部署时,涉及额外的组件,例如:Kafka 的运行就需要再部署 ZooKeeper。
相比 Redis 来说,Kafka 和 RabbitMQ 一般被认为是重量级的消息队列。

所以,关于是否用 Redis 做消息队列的问题,不能一概而论,
我们需要考虑业务层面的数据体量,以及对性能、可靠性、可扩展性的需求。
如果分布式系统中的组件消息通信量不大,那么,Redis 只需要使用有限的内存空间就能满足消息存储的需求,
而且,Redis 的高性能特性能支持快速的消息读写,也是消息队列的一个好的解决方案。

每课一问

如果一个生产者发送给消息队列的消息,需要被多个消费者进行读取和处理。
例如,一个消息是一条从业务系统采集的数据,既要被消费者 1 读取进行实时计算,
也要被消费者 2 读取并留存到分布式文件系统HDFS中,以便后续进行历史查询。
你会使用 Redis 的什么数据类型来解决这个问题呢?