Redis 的发布与订阅功能由 PUBLISH、SUBSCRIBE、PSUBSCRIBE 等命令组成。通过执行 SUBSCRIBE 命令,客户端可以订阅一个或多个频道(channel)从而成为这些频道的订阅者。每当有其他客户端向被订阅的频道发送消息时,频道的所有订阅者都会收到这条消息。
除了订阅频道之外,客户端还可以通过执行 PSUBSCRIBE 命令订阅一个或多个模式(pattern)从而成为这些模式的订阅者。每当有其他客户端向某个频道发送消息时,消息不仅会被发送给这个频道的所有订阅者,它还会被发送给所有与这个频道相匹配的模式的订阅者。
频道的订阅与退订
当一个客户端执行 SUBSCRIBE 命令订阅某个或某些频道的时候,这个客户端与被订阅频道之间就建立起了一种订阅关系。Redis 将所有频道的订阅关系都保存在服务器状态的 pubsub_channels 字典里面,字典的键是某个被订阅的频道,值则是一个链表,链表里记录了所有订阅这个频道的客户端。
struct redisServer {
// 保存所有频道的订阅关系
dict *pubsub_channels;
......
}
每当客户端执行 SUBSCRIBE 命令订阅某个频道时,服务器都会将客户端与被订阅的频道在 pubsub_channels 字典中进行关联,将客户端添加到订阅者链表的末尾。如果该频道还没有任何订阅者,程序会先初始化该频道对应的链表,然后再将客户端添加到链表。
UNSUBSCRIBE 命令的行为和 SUBSCRIBE 命令的行为正好相反,当一个客户端退订某个频道时,服务器将从 pubsub_channels 中解除客户端与被退订频道之间的关联。如果删除退订客户端后,频道的订阅者链表变成了空链表,程序会从 pubsub_channels 字典中删除频道对应的键。
模式的订阅与退订
与频道类似,Redis 将所有模式的订阅关系也都保存在服务器状态的 pubsub_patterns 属性里面。该属性是一个链表结构,链表中的每个节点是一个 pubsubPattern 结构:
struct redisServer {
// 保存所有模式的订阅关系
list *pubsub_patterns;
......
}
typedef struct pubsubPattern {
// 订阅模式的客户端
client *client;
// 被订阅的模式
robj *pattern;
} pubsubPattern;
每当客户端执行 PSUBSCRIBE 命令订阅某个模式时,服务器会对每个被订阅的模式新建一个 pubsubPattern 结构,并设置结构的 pattern 和 client 属性,然后添加到 pubsub_patterns 链表的表尾。
模式的退订命令 PUNSUBSCRIBE 是 PSUBSCRIBE 命令的反操作,当一个客户端退订某个模式时,服务器将会遍历 pubsub_patterns 链表,查找并删除那些 pattern 属性为被退订模式,并且 client 属性为执行退订命令的客户端的 pubsubPattern 结构。
发送频道消息
当客户端执行 PUBLISH
- 将消息发送给频道的所有订阅者,这一过程通过在 pubsub_channels 字典里找到该频道对应的订阅者名单链表,然后将消息发送给名单上的所有客户端。
- 如果有一个或多个模式 pattern 与频道 channel 相匹配,则将消息发送给 pattern 模式的订阅者,这一过程通过遍历整个 pubsub_patterns 链表,查找那些与频道相匹配的模式,并将消息发送给订阅了这些模式的客户端。
可以看到,PubSub 只会把数据发给在线的订阅者,订阅者一旦下线,就会丢弃下线期间内的数据。并且 PubSub 中的数据不支持数据持久化,当 Redis 宕机恢复后,其他类型的数据都可以从 RDB 和 AOF 中恢复回来,但 PubSub 不行。因此,它只提供了简单的基于内存的多播机制。
查看订阅信息
客户端可以通过 PUBSUB 命令来查看频道或者模式的相关信息,比如某个频道目前有多少订阅者等信息。它有如下三个子命令:
PUBSUB CHANNELS [pattern]
该命令返回服务器当前被订阅的频道,该命令是通过遍历服务器的 pubsubchannels _字典的所有键,然后记录并返回所有符合条件的频道来实现的。
PUBSUB NUMSUB [channel [channel ...]]
该命令接收任意多个频道作为输入参数,并返回这些频道的订阅者数量。该命令是通过在 pubsubchannels _字典中找到频道对应的订阅者链表,然后返回订阅者链表的长度来实现的。
PUBSUB NUMPAT
该命令返回服务器当前被订阅模式的数量,该命令是通过返回 pubsubpatterns _链表的长度来实现的。
基于 Streams 的消息队列
Redis 从 5.0 版本开始提供了 Streams 数据类型。Streams 数据类型是 Redis 专门为了消息队列而设计的数据类型,它弥补了 PUB/SUB 不能持久化消息的缺陷,并提供了丰富的消息队列操作命令。
- XADD:插入消息,保证有序,可以自动生成全局唯一 ID;
- XREAD:用于读取消息,可以按 ID 读取数据;
- XREADGROUP:按消费组形式读取消息;
- XPENDING:用来查询每个消费组内所有消费者已读取但尚未确认的消息;
- XACK:用于向消息队列确认消息处理已完成。
Redis Stream 数据结构如下图所示:它有一个消息链表,会将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。
每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 XADD 命令追加消息时自动创建。每个 Stream 都可以挂多个消费组,每个消费组会有个游标 last_delivered_id 在 Stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个 Stream 内唯一的名称,消费组在创建时需要指定从 Stream 的某个消息 ID 开始消费,这个 ID 就用来初始化 last_delivered_id 变量。
每个消费组的状态都是独立的,相互不受影响。也就是说同一份 Stream 的消息会被每个消费组都消费到。同一个消费组可以挂多个消费者,这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。每个消费者有一个组内唯一名称。
消费者内部会有个状态变量 pending_ids,它记录了当前已经被客户端读取的消息,但是还没有 ack。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为 PEL,也就是 Pending Entries List,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。
1. XADD
XADD 命令可以往消息队列中插入新消息,消息的格式是键-值对形式。对于插入的每一条消息,Streams 可以自动为其生成一个全局唯一的 ID。比如,执行以下命令可以往名称为 mqstream 的消息队列中插入一条消息,消息的键是 repo,值是 5。其中,消息队列名称后面的 表示让 Redis 为插入的数据自动生成一个全局唯一的 ID,例如 1599203861727-0。当然,我们也可以不用 ,直接在消息队列名称后自行设定一个 ID 号,只要保证这个 ID 号是全局唯一的就行。不过,相比自行设定 ID 号,使用 * 会更加方便高效。
XADD mqstream * repo 5
"1599203861727-0"
可以看到,消息的全局唯一 ID 由两部分组成,第一部分 “1599203861727” 是数据插入时,以毫秒为单位计算的当前服务器时间,第二部分表示插入消息在当前毫秒内的消息序号,这是从 0 开始编号的。
2. XREAD
当消费者需要读取消息时,可以直接使用 XREAD 命令从消息队列中读取。XREAD 在读取消息时,可以指定一个消息 ID,并从这个消息 ID 的下一条消息开始进行读取。例如,我们可以执行下面的命令,表示从 ID 号为 1599203861727-0 的消息开始,读取后续的所有消息(示例中一共 3 条)。
XREAD BLOCK 100 STREAMS mqstream 1599203861727-0
1) 1) "mqstream"
2) 1) 1) "1599274912765-0"
2) 1) "repo"
2) "3"
2) 1) "1599274925823-0"
2) 1) "repo"
2) "2"
3) 1) "1599274927910-0"
2) 1) "repo"
2) "1"
另外,消费者也可以在调用 XRAED 时设定 block 配置项,实现类似于 BRPOP 的阻塞读取操作。当消息队列中没有消息时,一旦设置了 block 配置项,XREAD 就会阻塞,阻塞的时长可以在 block 配置项进行设置。
举个例子,我们来看一下下面的命令,其中,命令最后的 “$” 符号表示读取最新的消息,同时,我们设置了 block 10000 的配置项,10000 的单位是毫秒,表明 XREAD 在读取最新消息时,如果没有消息到来,XREAD 将阻塞 10000 毫秒(即 10 秒)然后再返回。下面命令中的 XREAD 执行后,消息队列 mqstream 中一直没有消息,所以,XREAD 在 10 秒后返回空值(nil)。
XREAD block 10000 streams mqstream $
(nil)
(10.00s)
3. 消费组
Streams 本身可以使用 XGROUP 创建消费组,创建消费组之后,Streams 可以使用 XREADGROUP 命令让消费组内的消费者读取消息。
例如,我们执行下面的命令,创建一个名为 group1 的消费组,这个消费组消费的消息队列是 mqstream。
XGROUP create mqstream group1 0
OK
然后,我们再执行一段命令,让 group1 消费组里的消费者 consumer1 从 mqstream 中读取所有消息,其中,命令最后的参数 “>” 表示从第一条尚未被消费的消息开始读取。因为在 consumer1 读取消息前,group1 中没有其他消费者读取过消息,所以,consumer1 就得到 mqstream 消息队列中的所有消息了(一共 4 条)。
XREADGROUP group group1 consumer1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1599203861727-0"
2) 1) "repo"
2) "5"
2) 1) "1599274912765-0"
2) 1) "repo"
2) "3"
3) 1) "1599274925823-0"
2) 1) "repo"
2) "2"
4) 1) "1599274927910-0"
2) 1) "repo"
2) "1"
需要注意的是,消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了。比如说,我们执行完刚才的 XREADGROUP 命令后,再执行下面的命令,让 group1 内的 consumer2 读取消息时,consumer2 读到的就是空值,因为消息已经被 consumer1 读取完了,如下所示:
XREADGROUP group group1 consumer2 streams mqstream 0
1) 1) "mqstream"
2) (empty list or set)
使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。例如,我们执行下列命令,让 group2 中的 consumer1、2、3 各自读取一条消息。
XREADGROUP group group2 consumer1 count 1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1599203861727-0"
2) 1) "repo"
2) "5"
XREADGROUP group group2 consumer2 count 1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1599274912765-0"
2) 1) "repo"
2) "3"
XREADGROUP group group2 consumer3 count 1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1599274925823-0"
2) 1) "repo"
2) "2"
为了保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息,Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams 消息已经处理完成。如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。
例如,我们来查看一下 group2 中各个消费者已读取、但尚未确认的消息个数。其中,XPENDING 返回结果的第二、三行分别表示 group2 中所有消费者读取的消息最小 ID 和最大 ID。
XPENDING mqstream group2
1) (integer) 3
2) "1599203861727-0"
3) "1599274925823-0"
4) 1) 1) "consumer1"
2) "1"
2) 1) "consumer2"
2) "1"
3) 1) "consumer3"
2) "1"
如果我们还需要进一步查看某个消费者具体读取了哪些数据,可以执行下面的命令:
XPENDING mqstream group2 - + 10 consumer2
1) 1) "1599274912765-0"
2) "consumer2"
3) (integer) 513336
4) (integer) 1
可以看到,consumer2 已读取的消息的 ID 是 1599274912765-0。
一旦消息 1599274912765-0 被 consumer2 处理了,consumer2 就可以使用 XACK 命令通知 Streams,然后这条消息就会被删除。当我们再使用 XPENDING 命令查看时,就可以看到,consumer2 已经没有已读取、但尚未确认处理的消息了。
XACK mqstream group2 1599274912765-0
(integer) 1
XPENDING mqstream group2 - + 10 consumer2
(empty list or set)