一,发布订阅模式
再来看消息发布订阅的源代码之前,先来聊聊发布订阅的应用场景和使用方式。
1.列表的局限性
通过队列的 rpush 和 lpop 可以实现消息队列(队尾进队头出),但是消费者需要不停地调用 lpop 查看 List 中是否有等待处理的消息(比如写一个 while 循环)。为了减少通信的消耗,可以 sleep()一段时间再消费,但是会有两个问题:
如果生产者生产消息的速度远大于消费者消费消息的速度,List 会占用大量的内存。
消息的实时性降低。
list 还提供了一个阻塞的命令:blpop,没有任何元素可以弹出的时候,连接会被阻塞。
blpop queue 5
- 基于list实现的消息队列,不支持一对多的消息分发。
2.发布订阅模式
除了通过 list 实现消息队列之外,Redis 还提供了一组命令实现发布/订阅模式。这种方式,发送者和接收者没有直接关联(实现了解耦),接收者也不需要持续尝试获取消息。
2.1 订阅频道
首先,我们有很多的频道(channel),我们也可以把这个频道理解成 queue。订阅者可以订阅一个或者多个频道。消息的发布者(生产者)可以给指定的频道发布消息。只要有消息到达了频道,所有订阅了这个频道的订阅者都会收到这条消息。
需要注意的注意是,发出去的消息不会被持久化,因为它已经从队列里面移除了,所以消费者只能收到它开始订阅这个频道之后发布的消息。
订阅者订阅频道:可以一次订阅多个,比如这个客户端订阅了 3 个频道。
subscribe channel-1 channel-2 channel-3
发布者可以向指定频道发布消息(并不支持一次向多个频道发送消息):
publish channel-1 2673
取消订阅(不能在未订阅状态下使用):
unsubscribe channel-1
2.2 模糊订阅
支持*
和#
占位符。*
代表一个字符,#
代表 0 个或者多个字符。
消费端 1,关注运动信息:
psubscribe *sport
消费端 2,关注所有新闻:
psubscribe news*
消费端 3,关注天气新闻:
psubscribe news-weather
生产者,发布 3 条信息
publish news-sport yaoming
publish news-music jaychou
publish news-weather rain
3.源码分析
redis的所有命令的函数声明都在server.c文件的redisCommandTable数组里面,这个前面我们已经说过,再此我们从这里作为入口分析发布订阅的源码。
3.1 订阅
从redisCommandTable数组里面我们找到subscribe命令对应的处理函数为subscribeCommand,接下来我们来分析一下这个函数的逻辑:
这里的主要流程就是权限检查通过后,循环调用pubsubSubscribeChannel函数订阅指定的频道,然后设置客户端的状态。
这里有一点需要注意,最后设置的客户端状态有什么用?在server.c文件的processCommand
函数中,有一段代码:
/* Only allow a subset of commands in the context of Pub/Sub if the
* connection is in RESP2 mode. With RESP3 there are no limits. */
if ((c->flags & CLIENT_PUBSUB && c->resp == 2) &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand &&
c->cmd->proc != resetCommand) {
rejectCommandFormat(c,
"Can't execute '%s': only (P)SUBSCRIBE / "
"(P)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context",
c->cmd->name);
return C_OK;
}
这里是一个逻辑判断,就是当client处于pub/sub
上下文时,只接收订阅相关命令以及一个ping命令,这就解释了上面subscribeCommand
函数中为什么要设置客户端flag字段。
接下来我们分析下pubsubSubscribeChannel函数也就是订阅操作的核心逻辑:
- 把指定的channel加入到client的pubsub_channel哈希表中
- 加入失败说明本来就已经订阅过了
- 加入成功则
- 把引用加一
- 在server的发布订阅哈希表中查找指定channel
- 如果该channel还不存在,则创建
- 把client加入到该channel的订阅列表中,尾插
通知客户端
int pubsubSubscribeChannel(client *c, robj *channel) {
dictEntry *de;
list *clients = NULL;
int retval = 0;
/* 把指定的channel加入到client的pubsub_channel哈希表中,假如失败了,那就说明,哈希表里本来就有了,已经订阅过该频道。 */
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
//这里是把该channel加入到client的哈希表中,引用加1
incrRefCount(channel);
/* 在server的发布订阅哈希表中查找指定channel */
de = dictFind(server.pubsub_channels,channel);
//如果该channel还不存在,则创建
if (de == NULL) {
//初始化一个list
clients = listCreate();
/*把channel加入到server的哈希表中,value就是该channel的所有订阅者*/
dictAdd(server.pubsub_channels,channel,clients);
/*channel引用加1*/
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
/*把client加入到该channel的订阅列表中,尾插*/
listAddNodeTail(clients,c);
}
/* 通知客户端 */
addReplyPubsubSubscribed(c,channel);
return retval;
}
总结一下,订阅其实就是把指定channel分别加入到client和server的pub/sub哈希表中,然后在server端保存订阅了该channel的所有client列表。
补充两个哈希表的结构。
3.2 发布
接下来我们再来分析下发布的流程,同样是从redisCommandTable数组里面我们找到publish命令对应的处理函数为publishCommand,而publishCommand函数核心的逻辑就在函数pubsubPublishMessage
中,每次进行消息发布的时候,都会向普通模式跟模糊匹配模式发布消息。
- 取出订阅该channel的所有clients
- 如果有客户端指定订阅
- 获取client的链表
- 由client链表创建它的迭代器
- 遍历所有client并发送消息
开始处理模糊订阅
….
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
dictEntry *de;
dictIterator *di;
listNode *ln;
listIter li;
/* 取出订阅该channel的所有clients */
de = dictFind(server.pubsub_channels,channel);
if (de) {
/*获取client的链表*/
list *list = dictGetVal(de);
listNode *ln;
listIter li;
/*由client链表创建它的迭代器*/
listRewind(list,&li);
/*遍历所有client并发送消息*/
while ((ln = listNext(&li)) != NULL) {
client *c = ln->value;
addReplyPubsubMessage(c,channel,message);
receivers++;
}
}
/* 开始模糊匹配的逻辑处理,模糊匹配使用的是链表而不是哈希表 */
di = dictGetIterator(server.pubsub_patterns_dict);
if (di) {
channel = getDecodedObject(channel);
while((de = dictNext(di)) != NULL) {
robj *pattern = dictGetKey(de);
list *clients = dictGetVal(de);
if (!stringmatchlen((char*)pattern->ptr,
sdslen(pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) continue;
//创建模糊匹配规则的迭代器
listRewind(clients,&li);
/*循环遍历所有的匹配规则,如果匹配成功就发消息*/
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
addReplyPubsubPatMessage(c,pattern,channel,message);
receivers++;
}
}
decrRefCount(channel);
dictReleaseIterator(di);
}
return receivers;
}
模糊匹配和精准匹配大体上差不多,不过涉及到一个pattern,下面再分析。
3.3 模糊订阅
接着上面我们来分析下模糊订阅的处理流程:从redisCommandTable数组里面我们找到psubscribe命令对应的处理函数为psubscribeCommand,在通过权限校验后循环调用pubsubScribePattern函数订阅client指定的pattern,全部订阅完毕后,修改客户端的状态。
void psubscribeCommand(client *c) {
int j;
if (pubsubCheckACLPermissionsOrReply(c,1,c->argc-1,1) != ACL_OK) return;
if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) {
addReplyError(c, "PSUBSCRIBE isn't allowed for a DENY BLOCKING client");
return;
}
/*循环订阅client指定的pattern*/
for (j = 1; j < c->argc; j++)
pubsubSubscribePattern(c,c->argv[j]);
/*修改client的状态*/
c->flags |= CLIENT_PUBSUB;
}
接下来我们来看一下pubsubScribePattern函数也就是订阅客户端指定的分区逻辑:
- 判断client是否已经订阅该pattern
- 如果没有订阅过
- 把指定pattern加入到client的pattern链表中
- 引用计数+1
- 创建一个pattern对象,并指向该client,加入到server的pattern链表中
- 通知客户端
int pubsubSubscribePattern(client *c, robj *pattern) {
dictEntry *de;
list *clients;
int retval = 0;
/*判断client是否已经订阅该pattern,这里与普通模式不同,是个链表*/
if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
retval = 1;
pubsubPattern *pat;
/*把指定pattern加入到client的pattern链表中*/
listAddNodeTail(c->pubsub_patterns,pattern);
/*引用计数+1*/
incrRefCount(pattern);
/*
* 创建一个pattern对象,并指向该client,加入到server的pattern链表中
* 可以看出,多个client订阅同一个pattern会创建多个patter对象,与普通模式不同
*
* 注:正如上面提到的,模糊模式中,一个pat对象中包含一个pattern规则跟一个client指针,
* 也就是说当多个client模糊订阅同一个pattern时同样会为每个client都创建一个节点。
* */
pat = zmalloc(sizeof(*pat));
pat->pattern = getDecodedObject(pattern);
pat->client = c;
listAddNodeTail(server.pubsub_patterns,pat);
/* Add the client to the pattern -> list of clients hash table */
de = dictFind(server.pubsub_patterns_dict,pattern);
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_patterns_dict,pattern,clients);
incrRefCount(pattern);
} else {
clients = dictGetVal(de);
}
listAddNodeTail(clients,c);
}
/* 通知客户端 */
addReplyPubsubPatSubscribed(c,pattern);
return retval;
}
3.4 取消订阅
取消订阅的入口在unsubscribeCommand函数中,其实主要的目的无非就是想把上面所保存在server和client端的数据删除。
- 判断如果客户端输入的命令没有参数,就调用pubsubUnsubscribeAllChannels函数取消订阅所有channel的逻辑。
- 否则,根据客户端传递的参数循环调用pubsubUnsubscribeChannel函数取消指定channel的订阅。
最后,如果channel被全部取消,则修改client状态,这样client就可以发送其他命令了
void unsubscribeCommand(client *c) { /*如果该命令没有参数,则把channel全部取消*/ if (c->argc == 1) { /*取消订阅所有channel的逻辑*/ pubsubUnsubscribeAllChannels(c,1); } else { int j; /*循环来取消置顶的channel*/ for (j = 1; j < c->argc; j++) /*取消订阅某一个channel*/ pubsubUnsubscribeChannel(c,c->argv[j],1); } /*如果channel被全部取消,则修改client状态,这样client就可以发送其他命令了*/ if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB; }
先看一下函数pubsubUnsubscribeAllChannels也就是取消所有订阅的逻辑,其实也是一个个取消订阅的。首先取出client端所有的channel,从头开始遍历channel调用pubsubUnsubscribeChannel函数取消订阅。特殊的,即使client上面一个订阅也没有,也会返回成功,最后回收内存,返回取消的订阅数。
int pubsubUnsubscribeAllChannels(client *c, int notify) { /*取出client端所有的channel*/ dictIterator *di = dictGetSafeIterator(c->pubsub_channels); dictEntry *de; int count = 0; while((de = dictNext(di)) != NULL) { robj *channel = dictGetKey(de); /*一个个取消订阅channel*/ count += pubsubUnsubscribeChannel(c,channel,notify); } /* 如果client上面都没有订阅,依然返回响应 */ if (notify && count == 0) addReplyPubsubUnsubscribed(c,NULL); /*内存回收*/ dictReleaseIterator(di); return count; }
再看看pubsubUnsubscribeChannel函数也就是取消指定订阅的逻辑:
从client的哈希表中删除指定channel,如果删除成功:
- 删除server端该channel中的指定client
- 如果删除完以后channel没有了订阅者,则把channel也删除
- 通知客户端
引用计数-1
int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) { dictEntry *de; list *clients; listNode *ln; int retval = 0; /* Remove the channel from the client -> channels hash table */ incrRefCount(channel); /* channel may be just a pointer to the same object we have in the hash tables. Protect it... */ /*从client中删除指定channel*/ if (dictDelete(c->pubsub_channels,channel) == DICT_OK) { retval = 1; /* 删除server端该channel中的指定client */ de = dictFind(server.pubsub_channels,channel); serverAssertWithInfo(c,NULL,de != NULL); clients = dictGetVal(de); ln = listSearchKey(clients,c); serverAssertWithInfo(c,NULL,ln != NULL); listDelNode(clients,ln); if (listLength(clients) == 0) { /* 如果删除完以后channel没有了订阅者,则把channel也删除 */ dictDelete(server.pubsub_channels,channel); } } /* 通知客户端 */ if (notify) addReplyPubsubUnsubscribed(c,channel); /*引用计数-1*/ decrRefCount(channel); /* it is finally safe to release it */ /*普通模式跟模糊模式中分别使用了哈希表跟链表两种结构进行处理,而不是统一的,原因在于模糊模式不能精确匹配, * 需要遍历挨个判断,而哈希表的优势在于快速定位查找,在需要遍历跟模糊匹配的场景中并不适用。*/ return retval; }
至此,订阅发布模式的源码主要流程就分析完了,普通模式跟模糊模式中分别使用了哈希表跟链表两种结构进行处理,而不是统一的,原因在于模糊模式不能精确匹配,需要遍历之后一个个判断,而哈希表的优势在于快速定位查找,在需要遍历跟模糊匹配的场景中并不适用。
4.java接入redis发布订阅
4.1 依赖
<parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.2.6.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
4.2 配置类
/**
* @author 二十
* @since 2022/2/9 4:12 下午
*/
@SpringBootConfiguration
public class Config {
/**
* 配置redistemplate
*
* @param redisConnectionFactory
* @return
*/
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
// 使用Jackson2JsonRedisSerialize 替换默认的jdkSerializeable序列化
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
// 设置value的序列化规则和 key的序列化规则
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
/**
* redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
*
* @param redisConnectionFactory
* @param listenerAdapter
* @return
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
// 订阅多个频道
redisMessageListenerContainer.addMessageListener(listenerAdapter, new PatternTopic("test1"));
redisMessageListenerContainer.addMessageListener(listenerAdapter, new PatternTopic("test2"));
//不同的订阅者
//redisMessageListenerContainer.addMessageListener(listenerAdapter2, new PatternTopic("test2"));
//序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)
Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
seria.setObjectMapper(objectMapper);
redisMessageListenerContainer.setTopicSerializer(seria);
return redisMessageListenerContainer;
}
/**
* 表示监听一个频道
* MessageListenerAdapter:监听适配器
* 需要指定订阅者
* 这样就配置好了这个订阅者的监听适配器
*
* @param receiveListener
* @return
*/
@Bean
public MessageListenerAdapter listenerAdapter(ReceiveListener receiveListener) {
//这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“MessageReceive1 ”
return new MessageListenerAdapter(receiveListener);
}
}
4.3 配置文件
#springBoot整合redis
#端口
spring.redis.port=6379
#host
spring.redis.host=
#连接的数据库
spring.redis.database=0
#数据库密码
spring.redis.password=
#超时时间
spring.redis.timeout=1000000
#最大连接数
spring.redis.lettuce.pool.max-active=20
#最大阻塞等待时间 负数表示没有限制
spring.redis.lettuce.pool.max-wait=-1
#最大空闲连接数
spring.redis.lettuce.pool.max-idle=5
#最小空闲连接数
spring.redis.lettuce.pool.min-idle=0
4.4 监听器
@Slf4j
@Component
public class ReceiveListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] bytes) {
log.info("接收数据:{}", message.toString());
log.info("订阅频道:{}", new String(message.getChannel()));
}
}
4.5 测试发送
/**
* @author 二十
* @since 2022/2/9 4:12 下午
*/
@SpringBootTest(classes = RedisMain.class)
@Slf4j
public class RedisListenerTest {
@Resource
private RedisTemplate<String,Object> redisTemplate;
@Test
void contextLoads() {
log.info("执行发布");
redisTemplate.convertAndSend("test1","Hello,I'm Tom!");
}
}
二,键空间事件通知
1. 配置redis键空间事件通知
redis2.8.0版本之后推出了键空间事件通知,如何使用呢?当redis的key被删除时,redis会发送两种不同类型的事件,特定的事件会往特定的频道发送通知,我们只要订阅这个特定的频道等待通知即可。
两种不同类型的事件:
PUBLISH __keyspace@0__:mykey del
PUBLISH __keyevent@0__:del mykey
以keyspace为前缀的频道被称为键空间通知(key-space notification),订阅这个频道 keyspace@0:mykey,可以接收0号数据库中所有修改键 mykey 的事件,订阅者将接收到被执行的事件的名字,就是 del;
以keyevent为前缀的频道则称为键事件通知(key-event notification),订阅这个频道 keyevent@0:del,则可以接收0号数据库中所有执行 del 命令的键,订阅者将接收到被执行事件的键的名字,就是 mykey。
我们有两种方式开启键空间事件通知功能,或者只接受特定类型的通知:
通过修改redis.conf配置文件
# 默认为空,表示不开启键空间通知功能 notify-keyspace-events ""
通过CONFIG SET命令来设定notify-keyspace-events参数
# xx代表订阅事件的类型 CONFIG SET notify-keyspace-events xx
当服务器开启键空间事件通知功能时,需要指定事件的类型,即开启哪些特定类型的通知。在server.h头文件中,Redis设定了一系列的宏定义,用来标识事件的类型。其中,每一个宏定义代表的事件类型如下:
事件代号 | 事件类型 |
---|---|
K | 键空间通知,所有通知以keyspace@为前缀 |
E | 键事件通知,所有通知以keyevent@为前缀 |
g | DEL、EXPIRE、RENAME等类型无关的通用命令 |
$ | 字符串命令的通知 |
l | 列表命令的通知 |
s | 集合命令的通知 |
h | 哈希命令的通知 |
z | 有序集合命令的通知 |
x | 过期事件:每当有过期键被删除时发送 |
e | 驱逐事件:每当有键因为maxmemory政策而被删除时发送 |
A | 参数g$lshzxe 的别名,代表全部上述全部命令 |
关于notify-keyspace-events的设定,输入参数必须至少要有一个K或者E,用来标识该通知是键空间还是键事件;如果不包含,不管其余参数为什么,都将不会有任何通知被分发。
2.源码实现
键空间事件通知的源码实现仅仅只有三个函数,这三个函数声明在server.h头文件中:
/* 键空间事件通知 */
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid);
//将Notify设置参数由字符串转换成标识量flag
int keyspaceEventsStringToFlags(char *classes);
//将Notify设置参数由标识量flags转换成字符串
sds keyspaceEventsFlagsToString(int flags);
- keyspaceEventsStringToFlags 函数其实就是遍历每一个字符,做或映射。
- keyspaceEventsFlagsToString函数就是根据传过来的flag参数判断是哪种宏定义,然后在判断是键空间通知还是键事件通知【如果都不是,在sds最后拼接一个字符m】,返回结果。
- notifyKeyspaceEvent函数其实是利用Redis的订阅发布实现的键空间事件通知,当数据库中的键发生改变且服务器开启了相应的事件类型通知时,Redis就会发送键事件通知,通过pub/sub命令来告知客户端此刻数据库中的修改操作。
3.Notify应用
开启两个客户端:
- 在客户端一执行:
PSUBSCRIBE __keyevent*
,这样就开始订阅了符合模式串__keyevent*
的事件。 - 在客户端二执行:
CONFIG SET notify-keyspace-events KEA
,设置开启键空间事件通知,然后运行SET命令,这时客户端一就可以接收到这个事件。
Java代码实现Redis键空间失效通知:整个流程其实就是提前在项目里面配置好key过期的监听器,一旦redis中的某个key失效,就可以通过监听器感知到,然后执行相应的业务逻辑。
配置类
@SpringBootConfiguration
public class RedisConfig extends CachingConfigurerSupport {
@Bean
public RedisTemplate<String, Serializable> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(LettuceConnectionFactory lettuceConnectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(lettuceConnectionFactory);
return container;
}
}
�Redis键空间失效通知监听器
@Component
@Slf4j
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] pattern) {
String token = message.toString();
log.error("token:{}", token);
}
}
测试用例
@SpringBootTest(classes = RedisMain.class)
@Slf4j
class RedisListenerTest {
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Test
void contextLoads() throws Exception {
log.info("执行发布");
redisTemplate.opsForValue().setIfAbsent("yhd-keyspace-notify", "二十", 3l, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(5);
}
}