一,发布订阅模式

再来看消息发布订阅的源代码之前,先来聊聊发布订阅的应用场景和使用方式。

1.列表的局限性

通过队列的 rpush 和 lpop 可以实现消息队列(队尾进队头出),但是消费者需要不停地调用 lpop 查看 List 中是否有等待处理的消息(比如写一个 while 循环)。为了减少通信的消耗,可以 sleep()一段时间再消费,但是会有两个问题:

  1. 如果生产者生产消息的速度远大于消费者消费消息的速度,List 会占用大量的内存。

  2. 消息的实时性降低。

list 还提供了一个阻塞的命令:blpop,没有任何元素可以弹出的时候,连接会被阻塞。

  1. blpop queue 5
  1. 基于list实现的消息队列,不支持一对多的消息分发。

2.发布订阅模式

除了通过 list 实现消息队列之外,Redis 还提供了一组命令实现发布/订阅模式。这种方式,发送者和接收者没有直接关联(实现了解耦),接收者也不需要持续尝试获取消息。

2.1 订阅频道

首先,我们有很多的频道(channel),我们也可以把这个频道理解成 queue。订阅者可以订阅一个或者多个频道。消息的发布者(生产者)可以给指定的频道发布消息。只要有消息到达了频道,所有订阅了这个频道的订阅者都会收到这条消息。

需要注意的注意是,发出去的消息不会被持久化,因为它已经从队列里面移除了,所以消费者只能收到它开始订阅这个频道之后发布的消息。

订阅者订阅频道:可以一次订阅多个,比如这个客户端订阅了 3 个频道。

  1. subscribe channel-1 channel-2 channel-3

发布者可以向指定频道发布消息(并不支持一次向多个频道发送消息):

  1. publish channel-1 2673

取消订阅(不能在未订阅状态下使用):

  1. unsubscribe channel-1

2.2 模糊订阅

支持*#占位符*代表一个字符#代表 0 个或者多个字符。

消费端 1,关注运动信息:

  1. psubscribe *sport

消费端 2,关注所有新闻:

  1. psubscribe news*

消费端 3,关注天气新闻:

  1. psubscribe news-weather

生产者,发布 3 条信息

  1. publish news-sport yaoming
  2. publish news-music jaychou
  3. publish news-weather rain

image.png

3.源码分析

redis的所有命令的函数声明都在server.c文件的redisCommandTable数组里面,这个前面我们已经说过,再此我们从这里作为入口分析发布订阅的源码。

3.1 订阅

redisCommandTable数组里面我们找到subscribe命令对应的处理函数为subscribeCommand,接下来我们来分析一下这个函数的逻辑:

这里的主要流程就是权限检查通过后,循环调用pubsubSubscribeChannel函数订阅指定的频道,然后设置客户端的状态。

这里有一点需要注意,最后设置的客户端状态有什么用?在server.c文件的processCommand函数中,有一段代码:

  1. /* Only allow a subset of commands in the context of Pub/Sub if the
  2. * connection is in RESP2 mode. With RESP3 there are no limits. */
  3. if ((c->flags & CLIENT_PUBSUB && c->resp == 2) &&
  4. c->cmd->proc != pingCommand &&
  5. c->cmd->proc != subscribeCommand &&
  6. c->cmd->proc != unsubscribeCommand &&
  7. c->cmd->proc != psubscribeCommand &&
  8. c->cmd->proc != punsubscribeCommand &&
  9. c->cmd->proc != resetCommand) {
  10. rejectCommandFormat(c,
  11. "Can't execute '%s': only (P)SUBSCRIBE / "
  12. "(P)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context",
  13. c->cmd->name);
  14. return C_OK;
  15. }

这里是一个逻辑判断,就是当client处于pub/sub上下文时,只接收订阅相关命令以及一个ping命令,这就解释了上面subscribeCommand函数中为什么要设置客户端flag字段。

接下来我们分析下pubsubSubscribeChannel函数也就是订阅操作的核心逻辑:

  1. 把指定的channel加入到client的pubsub_channel哈希表中
  2. 加入失败说明本来就已经订阅过了
  3. 加入成功则
    1. 把引用加一
    2. 在server的发布订阅哈希表中查找指定channel
    3. 如果该channel还不存在,则创建
    4. 把client加入到该channel的订阅列表中,尾插
  4. 通知客户端

    1. int pubsubSubscribeChannel(client *c, robj *channel) {
    2. dictEntry *de;
    3. list *clients = NULL;
    4. int retval = 0;
    5. /* 把指定的channel加入到client的pubsub_channel哈希表中,假如失败了,那就说明,哈希表里本来就有了,已经订阅过该频道。 */
    6. if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
    7. retval = 1;
    8. //这里是把该channel加入到client的哈希表中,引用加1
    9. incrRefCount(channel);
    10. /* 在server的发布订阅哈希表中查找指定channel */
    11. de = dictFind(server.pubsub_channels,channel);
    12. //如果该channel还不存在,则创建
    13. if (de == NULL) {
    14. //初始化一个list
    15. clients = listCreate();
    16. /*把channel加入到server的哈希表中,value就是该channel的所有订阅者*/
    17. dictAdd(server.pubsub_channels,channel,clients);
    18. /*channel引用加1*/
    19. incrRefCount(channel);
    20. } else {
    21. clients = dictGetVal(de);
    22. }
    23. /*把client加入到该channel的订阅列表中,尾插*/
    24. listAddNodeTail(clients,c);
    25. }
    26. /* 通知客户端 */
    27. addReplyPubsubSubscribed(c,channel);
    28. return retval;
    29. }

    总结一下,订阅其实就是把指定channel分别加入到client和server的pub/sub哈希表中,然后在server端保存订阅了该channel的所有client列表。
    image.png

    补充两个哈希表的结构。

image.png

3.2 发布

接下来我们再来分析下发布的流程,同样是从redisCommandTable数组里面我们找到publish命令对应的处理函数为publishCommand,publishCommand函数核心的逻辑就在函数pubsubPublishMessage中,每次进行消息发布的时候,都会向普通模式跟模糊匹配模式发布消息。

  1. 取出订阅该channel的所有clients
  2. 如果有客户端指定订阅
    1. 获取client的链表
    2. 由client链表创建它的迭代器
    3. 遍历所有client并发送消息
  3. 开始处理模糊订阅

    1. ….

      1. int pubsubPublishMessage(robj *channel, robj *message) {
      2. int receivers = 0;
      3. dictEntry *de;
      4. dictIterator *di;
      5. listNode *ln;
      6. listIter li;
      7. /* 取出订阅该channel的所有clients */
      8. de = dictFind(server.pubsub_channels,channel);
      9. if (de) {
      10. /*获取client的链表*/
      11. list *list = dictGetVal(de);
      12. listNode *ln;
      13. listIter li;
      14. /*由client链表创建它的迭代器*/
      15. listRewind(list,&li);
      16. /*遍历所有client并发送消息*/
      17. while ((ln = listNext(&li)) != NULL) {
      18. client *c = ln->value;
      19. addReplyPubsubMessage(c,channel,message);
      20. receivers++;
      21. }
      22. }
      23. /* 开始模糊匹配的逻辑处理,模糊匹配使用的是链表而不是哈希表 */
      24. di = dictGetIterator(server.pubsub_patterns_dict);
      25. if (di) {
      26. channel = getDecodedObject(channel);
      27. while((de = dictNext(di)) != NULL) {
      28. robj *pattern = dictGetKey(de);
      29. list *clients = dictGetVal(de);
      30. if (!stringmatchlen((char*)pattern->ptr,
      31. sdslen(pattern->ptr),
      32. (char*)channel->ptr,
      33. sdslen(channel->ptr),0)) continue;
      34. //创建模糊匹配规则的迭代器
      35. listRewind(clients,&li);
      36. /*循环遍历所有的匹配规则,如果匹配成功就发消息*/
      37. while ((ln = listNext(&li)) != NULL) {
      38. client *c = listNodeValue(ln);
      39. addReplyPubsubPatMessage(c,pattern,channel,message);
      40. receivers++;
      41. }
      42. }
      43. decrRefCount(channel);
      44. dictReleaseIterator(di);
      45. }
      46. return receivers;
      47. }

      模糊匹配和精准匹配大体上差不多,不过涉及到一个pattern,下面再分析。
      image.png

      3.3 模糊订阅

接着上面我们来分析下模糊订阅的处理流程:从redisCommandTable数组里面我们找到psubscribe命令对应的处理函数为psubscribeCommand,在通过权限校验后循环调用pubsubScribePattern函数订阅client指定的pattern,全部订阅完毕后,修改客户端的状态。

  1. void psubscribeCommand(client *c) {
  2. int j;
  3. if (pubsubCheckACLPermissionsOrReply(c,1,c->argc-1,1) != ACL_OK) return;
  4. if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) {
  5. addReplyError(c, "PSUBSCRIBE isn't allowed for a DENY BLOCKING client");
  6. return;
  7. }
  8. /*循环订阅client指定的pattern*/
  9. for (j = 1; j < c->argc; j++)
  10. pubsubSubscribePattern(c,c->argv[j]);
  11. /*修改client的状态*/
  12. c->flags |= CLIENT_PUBSUB;
  13. }

接下来我们来看一下pubsubScribePattern函数也就是订阅客户端指定的分区逻辑:

  1. 判断client是否已经订阅该pattern
  2. 如果没有订阅过
    1. 把指定pattern加入到client的pattern链表中
    2. 引用计数+1
    3. 创建一个pattern对象,并指向该client,加入到server的pattern链表中
  3. 通知客户端
  1. int pubsubSubscribePattern(client *c, robj *pattern) {
  2. dictEntry *de;
  3. list *clients;
  4. int retval = 0;
  5. /*判断client是否已经订阅该pattern,这里与普通模式不同,是个链表*/
  6. if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
  7. retval = 1;
  8. pubsubPattern *pat;
  9. /*把指定pattern加入到client的pattern链表中*/
  10. listAddNodeTail(c->pubsub_patterns,pattern);
  11. /*引用计数+1*/
  12. incrRefCount(pattern);
  13. /*
  14. * 创建一个pattern对象,并指向该client,加入到server的pattern链表中
  15. * 可以看出,多个client订阅同一个pattern会创建多个patter对象,与普通模式不同
  16. *
  17. * 注:正如上面提到的,模糊模式中,一个pat对象中包含一个pattern规则跟一个client指针,
  18. * 也就是说当多个client模糊订阅同一个pattern时同样会为每个client都创建一个节点。
  19. * */
  20. pat = zmalloc(sizeof(*pat));
  21. pat->pattern = getDecodedObject(pattern);
  22. pat->client = c;
  23. listAddNodeTail(server.pubsub_patterns,pat);
  24. /* Add the client to the pattern -> list of clients hash table */
  25. de = dictFind(server.pubsub_patterns_dict,pattern);
  26. if (de == NULL) {
  27. clients = listCreate();
  28. dictAdd(server.pubsub_patterns_dict,pattern,clients);
  29. incrRefCount(pattern);
  30. } else {
  31. clients = dictGetVal(de);
  32. }
  33. listAddNodeTail(clients,c);
  34. }
  35. /* 通知客户端 */
  36. addReplyPubsubPatSubscribed(c,pattern);
  37. return retval;
  38. }

3.4 取消订阅

取消订阅的入口在unsubscribeCommand函数中,其实主要的目的无非就是想把上面所保存在server和client端的数据删除。

  1. 判断如果客户端输入的命令没有参数,就调用pubsubUnsubscribeAllChannels函数取消订阅所有channel的逻辑。
  2. 否则,根据客户端传递的参数循环调用pubsubUnsubscribeChannel函数取消指定channel的订阅。
  3. 最后,如果channel被全部取消,则修改client状态,这样client就可以发送其他命令了

    void unsubscribeCommand(client *c) {
     /*如果该命令没有参数,则把channel全部取消*/
     if (c->argc == 1) {
         /*取消订阅所有channel的逻辑*/
         pubsubUnsubscribeAllChannels(c,1);
     } else {
         int j;
         /*循环来取消置顶的channel*/
         for (j = 1; j < c->argc; j++)
             /*取消订阅某一个channel*/
             pubsubUnsubscribeChannel(c,c->argv[j],1);
     }
     /*如果channel被全部取消,则修改client状态,这样client就可以发送其他命令了*/
     if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
    }
    

    先看一下函数pubsubUnsubscribeAllChannels也就是取消所有订阅的逻辑,其实也是一个个取消订阅的。首先取出client端所有的channel,从头开始遍历channel调用pubsubUnsubscribeChannel函数取消订阅。特殊的,即使client上面一个订阅也没有,也会返回成功,最后回收内存,返回取消的订阅数。

    int pubsubUnsubscribeAllChannels(client *c, int notify) {
     /*取出client端所有的channel*/
     dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
     dictEntry *de;
     int count = 0;
    
     while((de = dictNext(di)) != NULL) {
         robj *channel = dictGetKey(de);
         /*一个个取消订阅channel*/
         count += pubsubUnsubscribeChannel(c,channel,notify);
     }
     /* 如果client上面都没有订阅,依然返回响应 */
     if (notify && count == 0) addReplyPubsubUnsubscribed(c,NULL);
     /*内存回收*/
     dictReleaseIterator(di);
     return count;
    }
    

    再看看pubsubUnsubscribeChannel函数也就是取消指定订阅的逻辑:

  4. 从client的哈希表中删除指定channel,如果删除成功:

    1. 删除server端该channel中的指定client
    2. 如果删除完以后channel没有了订阅者,则把channel也删除
  5. 通知客户端
  6. 引用计数-1

    int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
     dictEntry *de;
     list *clients;
     listNode *ln;
     int retval = 0;
    
     /* Remove the channel from the client -> channels hash table */
     incrRefCount(channel); /* channel may be just a pointer to the same object
                             we have in the hash tables. Protect it... */
     /*从client中删除指定channel*/
     if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
         retval = 1;
         /* 删除server端该channel中的指定client */
         de = dictFind(server.pubsub_channels,channel);
         serverAssertWithInfo(c,NULL,de != NULL);
         clients = dictGetVal(de);
         ln = listSearchKey(clients,c);
         serverAssertWithInfo(c,NULL,ln != NULL);
         listDelNode(clients,ln);
         if (listLength(clients) == 0) {
             /* 如果删除完以后channel没有了订阅者,则把channel也删除 */
             dictDelete(server.pubsub_channels,channel);
         }
     }
     /* 通知客户端 */
     if (notify) addReplyPubsubUnsubscribed(c,channel);
     /*引用计数-1*/
     decrRefCount(channel); /* it is finally safe to release it */
     /*普通模式跟模糊模式中分别使用了哈希表跟链表两种结构进行处理,而不是统一的,原因在于模糊模式不能精确匹配,
      * 需要遍历挨个判断,而哈希表的优势在于快速定位查找,在需要遍历跟模糊匹配的场景中并不适用。*/
     return retval;
    }
    

    image.png


至此,订阅发布模式的源码主要流程就分析完了,普通模式跟模糊模式中分别使用了哈希表跟链表两种结构进行处理,而不是统一的,原因在于模糊模式不能精确匹配,需要遍历之后一个个判断,而哈希表的优势在于快速定位查找,在需要遍历跟模糊匹配的场景中并不适用。

4.java接入redis发布订阅

4.1 依赖

    <parent>
        <artifactId>spring-boot-starter-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.2.6.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <version>2.2.6.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
    </dependencies>

4.2 配置类

/**
 * @author 二十
 * @since 2022/2/9 4:12 下午
 */
@SpringBootConfiguration
public class Config {

    /**
     * 配置redistemplate
     *
     * @param redisConnectionFactory
     * @return
     */
    @Bean
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        // 使用Jackson2JsonRedisSerialize 替换默认的jdkSerializeable序列化
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        // 设置value的序列化规则和 key的序列化规则
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

    /**
     * redis消息监听器容器
     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
     *
     * @param redisConnectionFactory
     * @param listenerAdapter
     * @return
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        // 订阅多个频道
        redisMessageListenerContainer.addMessageListener(listenerAdapter, new PatternTopic("test1"));
        redisMessageListenerContainer.addMessageListener(listenerAdapter, new PatternTopic("test2"));
        //不同的订阅者
        //redisMessageListenerContainer.addMessageListener(listenerAdapter2, new PatternTopic("test2"));

        //序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)
        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        redisMessageListenerContainer.setTopicSerializer(seria);
        return redisMessageListenerContainer;
    }


    /**
     * 表示监听一个频道
     * MessageListenerAdapter:监听适配器
     * 需要指定订阅者
     * 这样就配置好了这个订阅者的监听适配器
     *
     * @param receiveListener
     * @return
     */
    @Bean
    public MessageListenerAdapter listenerAdapter(ReceiveListener receiveListener) {
        //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“MessageReceive1 ”
        return new MessageListenerAdapter(receiveListener);
    }

}

4.3 配置文件

#springBoot整合redis
#端口
spring.redis.port=6379
#host
spring.redis.host=
#连接的数据库
spring.redis.database=0
#数据库密码
spring.redis.password=
#超时时间
spring.redis.timeout=1000000
#最大连接数
spring.redis.lettuce.pool.max-active=20
#最大阻塞等待时间 负数表示没有限制
spring.redis.lettuce.pool.max-wait=-1
#最大空闲连接数
spring.redis.lettuce.pool.max-idle=5
#最小空闲连接数
spring.redis.lettuce.pool.min-idle=0

4.4 监听器

@Slf4j
@Component
public class ReceiveListener implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] bytes) {
        log.info("接收数据:{}", message.toString());
        log.info("订阅频道:{}", new String(message.getChannel()));
    }
}

4.5 测试发送

/**
 * @author 二十
 * @since 2022/2/9 4:12 下午
 */
@SpringBootTest(classes = RedisMain.class)
@Slf4j
public class RedisListenerTest {

    @Resource
    private RedisTemplate<String,Object> redisTemplate;

    @Test
     void contextLoads() {
       log.info("执行发布");
        redisTemplate.convertAndSend("test1","Hello,I'm Tom!");
    }
}

二,键空间事件通知

1. 配置redis键空间事件通知

redis2.8.0版本之后推出了键空间事件通知,如何使用呢?当redis的key被删除时,redis会发送两种不同类型的事件,特定的事件会往特定的频道发送通知,我们只要订阅这个特定的频道等待通知即可。

两种不同类型的事件:

PUBLISH __keyspace@0__:mykey del
PUBLISH __keyevent@0__:del mykey
  • 以keyspace为前缀的频道被称为键空间通知(key-space notification),订阅这个频道 keyspace@0:mykey,可以接收0号数据库中所有修改键 mykey 的事件,订阅者将接收到被执行的事件的名字,就是 del;

  • 以keyevent为前缀的频道则称为键事件通知(key-event notification),订阅这个频道 keyevent@0:del,则可以接收0号数据库中所有执行 del 命令的键,订阅者将接收到被执行事件的键的名字,就是 mykey。

我们有两种方式开启键空间事件通知功能,或者只接受特定类型的通知:

  1. 通过修改redis.conf配置文件

    # 默认为空,表示不开启键空间通知功能
    notify-keyspace-events ""
    
  2. 通过CONFIG SET命令来设定notify-keyspace-events参数

    # xx代表订阅事件的类型
    CONFIG SET notify-keyspace-events xx
    

    当服务器开启键空间事件通知功能时,需要指定事件的类型,即开启哪些特定类型的通知。在server.h头文件中,Redis设定了一系列的宏定义,用来标识事件的类型。其中,每一个宏定义代表的事件类型如下:

事件代号 事件类型
K 键空间通知,所有通知以keyspace@为前缀
E 键事件通知,所有通知以keyevent@为前缀
g DEL、EXPIRE、RENAME等类型无关的通用命令
$ 字符串命令的通知
l 列表命令的通知
s 集合命令的通知
h 哈希命令的通知
z 有序集合命令的通知
x 过期事件:每当有过期键被删除时发送
e 驱逐事件:每当有键因为maxmemory政策而被删除时发送
A 参数g$lshzxe的别名,代表全部上述全部命令

关于notify-keyspace-events的设定,输入参数必须至少要有一个K或者E,用来标识该通知是键空间还是键事件;如果不包含,不管其余参数为什么,都将不会有任何通知被分发。

2.源码实现

键空间事件通知的源码实现仅仅只有三个函数,这三个函数声明在server.h头文件中:

/* 键空间事件通知 */
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid);
//将Notify设置参数由字符串转换成标识量flag
int keyspaceEventsStringToFlags(char *classes);
//将Notify设置参数由标识量flags转换成字符串
sds keyspaceEventsFlagsToString(int flags);
  • keyspaceEventsStringToFlags 函数其实就是遍历每一个字符,做或映射。
  • keyspaceEventsFlagsToString函数就是根据传过来的flag参数判断是哪种宏定义,然后在判断是键空间通知还是键事件通知【如果都不是,在sds最后拼接一个字符m】,返回结果。
  • notifyKeyspaceEvent函数其实是利用Redis的订阅发布实现的键空间事件通知,当数据库中的键发生改变且服务器开启了相应的事件类型通知时,Redis就会发送键事件通知,通过pub/sub命令来告知客户端此刻数据库中的修改操作。

    3.Notify应用

开启两个客户端:

  • 在客户端一执行:PSUBSCRIBE __keyevent*,这样就开始订阅了符合模式串__keyevent*的事件。
  • 在客户端二执行:CONFIG SET notify-keyspace-events KEA,设置开启键空间事件通知,然后运行SET命令,这时客户端一就可以接收到这个事件。

image.pngimage.png


Java代码实现Redis键空间失效通知:整个流程其实就是提前在项目里面配置好key过期的监听器,一旦redis中的某个key失效,就可以通过监听器感知到,然后执行相应的业务逻辑。

配置类

@SpringBootConfiguration
public class RedisConfig extends CachingConfigurerSupport {

    @Bean
    public RedisTemplate<String, Serializable> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
        RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(lettuceConnectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(LettuceConnectionFactory lettuceConnectionFactory) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(lettuceConnectionFactory);
        return container;
    }
}

�Redis键空间失效通知监听器

@Component
@Slf4j
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {

    public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {

        String token = message.toString();

        log.error("token:{}", token);
    }
}

测试用例

@SpringBootTest(classes = RedisMain.class)
@Slf4j
class RedisListenerTest {

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    @Test
    void contextLoads() throws Exception {
        log.info("执行发布");
        redisTemplate.opsForValue().setIfAbsent("yhd-keyspace-notify", "二十", 3l, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(5);
    }
}