一,发布订阅模式
再来看消息发布订阅的源代码之前,先来聊聊发布订阅的应用场景和使用方式。
1.列表的局限性
通过队列的 rpush 和 lpop 可以实现消息队列(队尾进队头出),但是消费者需要不停地调用 lpop 查看 List 中是否有等待处理的消息(比如写一个 while 循环)。为了减少通信的消耗,可以 sleep()一段时间再消费,但是会有两个问题:
如果生产者生产消息的速度远大于消费者消费消息的速度,List 会占用大量的内存。
消息的实时性降低。
list 还提供了一个阻塞的命令:blpop,没有任何元素可以弹出的时候,连接会被阻塞。
blpop queue 5
- 基于list实现的消息队列,不支持一对多的消息分发。
2.发布订阅模式
除了通过 list 实现消息队列之外,Redis 还提供了一组命令实现发布/订阅模式。这种方式,发送者和接收者没有直接关联(实现了解耦),接收者也不需要持续尝试获取消息。
2.1 订阅频道
首先,我们有很多的频道(channel),我们也可以把这个频道理解成 queue。订阅者可以订阅一个或者多个频道。消息的发布者(生产者)可以给指定的频道发布消息。只要有消息到达了频道,所有订阅了这个频道的订阅者都会收到这条消息。
需要注意的注意是,发出去的消息不会被持久化,因为它已经从队列里面移除了,所以消费者只能收到它开始订阅这个频道之后发布的消息。
订阅者订阅频道:可以一次订阅多个,比如这个客户端订阅了 3 个频道。
subscribe channel-1 channel-2 channel-3
发布者可以向指定频道发布消息(并不支持一次向多个频道发送消息):
publish channel-1 2673
取消订阅(不能在未订阅状态下使用):
unsubscribe channel-1
2.2 模糊订阅
支持*和#占位符。*代表一个字符,#代表 0 个或者多个字符。
消费端 1,关注运动信息:
psubscribe *sport
消费端 2,关注所有新闻:
psubscribe news*
消费端 3,关注天气新闻:
psubscribe news-weather
生产者,发布 3 条信息
publish news-sport yaomingpublish news-music jaychoupublish news-weather rain
3.源码分析
redis的所有命令的函数声明都在server.c文件的redisCommandTable数组里面,这个前面我们已经说过,再此我们从这里作为入口分析发布订阅的源码。
3.1 订阅
从redisCommandTable数组里面我们找到subscribe命令对应的处理函数为subscribeCommand,接下来我们来分析一下这个函数的逻辑:
这里的主要流程就是权限检查通过后,循环调用pubsubSubscribeChannel函数订阅指定的频道,然后设置客户端的状态。
这里有一点需要注意,最后设置的客户端状态有什么用?在server.c文件的processCommand函数中,有一段代码:
/* Only allow a subset of commands in the context of Pub/Sub if the* connection is in RESP2 mode. With RESP3 there are no limits. */if ((c->flags & CLIENT_PUBSUB && c->resp == 2) &&c->cmd->proc != pingCommand &&c->cmd->proc != subscribeCommand &&c->cmd->proc != unsubscribeCommand &&c->cmd->proc != psubscribeCommand &&c->cmd->proc != punsubscribeCommand &&c->cmd->proc != resetCommand) {rejectCommandFormat(c,"Can't execute '%s': only (P)SUBSCRIBE / ""(P)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context",c->cmd->name);return C_OK;}
这里是一个逻辑判断,就是当client处于pub/sub上下文时,只接收订阅相关命令以及一个ping命令,这就解释了上面subscribeCommand函数中为什么要设置客户端flag字段。
接下来我们分析下pubsubSubscribeChannel函数也就是订阅操作的核心逻辑:
- 把指定的channel加入到client的pubsub_channel哈希表中
- 加入失败说明本来就已经订阅过了
- 加入成功则
- 把引用加一
- 在server的发布订阅哈希表中查找指定channel
- 如果该channel还不存在,则创建
- 把client加入到该channel的订阅列表中,尾插
通知客户端
int pubsubSubscribeChannel(client *c, robj *channel) {dictEntry *de;list *clients = NULL;int retval = 0;/* 把指定的channel加入到client的pubsub_channel哈希表中,假如失败了,那就说明,哈希表里本来就有了,已经订阅过该频道。 */if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {retval = 1;//这里是把该channel加入到client的哈希表中,引用加1incrRefCount(channel);/* 在server的发布订阅哈希表中查找指定channel */de = dictFind(server.pubsub_channels,channel);//如果该channel还不存在,则创建if (de == NULL) {//初始化一个listclients = listCreate();/*把channel加入到server的哈希表中,value就是该channel的所有订阅者*/dictAdd(server.pubsub_channels,channel,clients);/*channel引用加1*/incrRefCount(channel);} else {clients = dictGetVal(de);}/*把client加入到该channel的订阅列表中,尾插*/listAddNodeTail(clients,c);}/* 通知客户端 */addReplyPubsubSubscribed(c,channel);return retval;}
总结一下,订阅其实就是把指定channel分别加入到client和server的pub/sub哈希表中,然后在server端保存订阅了该channel的所有client列表。

补充两个哈希表的结构。
3.2 发布
接下来我们再来分析下发布的流程,同样是从redisCommandTable数组里面我们找到publish命令对应的处理函数为publishCommand,而publishCommand函数核心的逻辑就在函数pubsubPublishMessage中,每次进行消息发布的时候,都会向普通模式跟模糊匹配模式发布消息。
- 取出订阅该channel的所有clients
- 如果有客户端指定订阅
- 获取client的链表
- 由client链表创建它的迭代器
- 遍历所有client并发送消息
开始处理模糊订阅
….
int pubsubPublishMessage(robj *channel, robj *message) {int receivers = 0;dictEntry *de;dictIterator *di;listNode *ln;listIter li;/* 取出订阅该channel的所有clients */de = dictFind(server.pubsub_channels,channel);if (de) {/*获取client的链表*/list *list = dictGetVal(de);listNode *ln;listIter li;/*由client链表创建它的迭代器*/listRewind(list,&li);/*遍历所有client并发送消息*/while ((ln = listNext(&li)) != NULL) {client *c = ln->value;addReplyPubsubMessage(c,channel,message);receivers++;}}/* 开始模糊匹配的逻辑处理,模糊匹配使用的是链表而不是哈希表 */di = dictGetIterator(server.pubsub_patterns_dict);if (di) {channel = getDecodedObject(channel);while((de = dictNext(di)) != NULL) {robj *pattern = dictGetKey(de);list *clients = dictGetVal(de);if (!stringmatchlen((char*)pattern->ptr,sdslen(pattern->ptr),(char*)channel->ptr,sdslen(channel->ptr),0)) continue;//创建模糊匹配规则的迭代器listRewind(clients,&li);/*循环遍历所有的匹配规则,如果匹配成功就发消息*/while ((ln = listNext(&li)) != NULL) {client *c = listNodeValue(ln);addReplyPubsubPatMessage(c,pattern,channel,message);receivers++;}}decrRefCount(channel);dictReleaseIterator(di);}return receivers;}
模糊匹配和精准匹配大体上差不多,不过涉及到一个pattern,下面再分析。
3.3 模糊订阅
接着上面我们来分析下模糊订阅的处理流程:从redisCommandTable数组里面我们找到psubscribe命令对应的处理函数为psubscribeCommand,在通过权限校验后循环调用pubsubScribePattern函数订阅client指定的pattern,全部订阅完毕后,修改客户端的状态。
void psubscribeCommand(client *c) {int j;if (pubsubCheckACLPermissionsOrReply(c,1,c->argc-1,1) != ACL_OK) return;if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) {addReplyError(c, "PSUBSCRIBE isn't allowed for a DENY BLOCKING client");return;}/*循环订阅client指定的pattern*/for (j = 1; j < c->argc; j++)pubsubSubscribePattern(c,c->argv[j]);/*修改client的状态*/c->flags |= CLIENT_PUBSUB;}
接下来我们来看一下pubsubScribePattern函数也就是订阅客户端指定的分区逻辑:
- 判断client是否已经订阅该pattern
- 如果没有订阅过
- 把指定pattern加入到client的pattern链表中
- 引用计数+1
- 创建一个pattern对象,并指向该client,加入到server的pattern链表中
- 通知客户端
int pubsubSubscribePattern(client *c, robj *pattern) {dictEntry *de;list *clients;int retval = 0;/*判断client是否已经订阅该pattern,这里与普通模式不同,是个链表*/if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {retval = 1;pubsubPattern *pat;/*把指定pattern加入到client的pattern链表中*/listAddNodeTail(c->pubsub_patterns,pattern);/*引用计数+1*/incrRefCount(pattern);/** 创建一个pattern对象,并指向该client,加入到server的pattern链表中* 可以看出,多个client订阅同一个pattern会创建多个patter对象,与普通模式不同** 注:正如上面提到的,模糊模式中,一个pat对象中包含一个pattern规则跟一个client指针,* 也就是说当多个client模糊订阅同一个pattern时同样会为每个client都创建一个节点。* */pat = zmalloc(sizeof(*pat));pat->pattern = getDecodedObject(pattern);pat->client = c;listAddNodeTail(server.pubsub_patterns,pat);/* Add the client to the pattern -> list of clients hash table */de = dictFind(server.pubsub_patterns_dict,pattern);if (de == NULL) {clients = listCreate();dictAdd(server.pubsub_patterns_dict,pattern,clients);incrRefCount(pattern);} else {clients = dictGetVal(de);}listAddNodeTail(clients,c);}/* 通知客户端 */addReplyPubsubPatSubscribed(c,pattern);return retval;}
3.4 取消订阅
取消订阅的入口在unsubscribeCommand函数中,其实主要的目的无非就是想把上面所保存在server和client端的数据删除。
- 判断如果客户端输入的命令没有参数,就调用pubsubUnsubscribeAllChannels函数取消订阅所有channel的逻辑。
- 否则,根据客户端传递的参数循环调用pubsubUnsubscribeChannel函数取消指定channel的订阅。
最后,如果channel被全部取消,则修改client状态,这样client就可以发送其他命令了
void unsubscribeCommand(client *c) { /*如果该命令没有参数,则把channel全部取消*/ if (c->argc == 1) { /*取消订阅所有channel的逻辑*/ pubsubUnsubscribeAllChannels(c,1); } else { int j; /*循环来取消置顶的channel*/ for (j = 1; j < c->argc; j++) /*取消订阅某一个channel*/ pubsubUnsubscribeChannel(c,c->argv[j],1); } /*如果channel被全部取消,则修改client状态,这样client就可以发送其他命令了*/ if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB; }先看一下函数pubsubUnsubscribeAllChannels也就是取消所有订阅的逻辑,其实也是一个个取消订阅的。首先取出client端所有的channel,从头开始遍历channel调用pubsubUnsubscribeChannel函数取消订阅。特殊的,即使client上面一个订阅也没有,也会返回成功,最后回收内存,返回取消的订阅数。
int pubsubUnsubscribeAllChannels(client *c, int notify) { /*取出client端所有的channel*/ dictIterator *di = dictGetSafeIterator(c->pubsub_channels); dictEntry *de; int count = 0; while((de = dictNext(di)) != NULL) { robj *channel = dictGetKey(de); /*一个个取消订阅channel*/ count += pubsubUnsubscribeChannel(c,channel,notify); } /* 如果client上面都没有订阅,依然返回响应 */ if (notify && count == 0) addReplyPubsubUnsubscribed(c,NULL); /*内存回收*/ dictReleaseIterator(di); return count; }再看看pubsubUnsubscribeChannel函数也就是取消指定订阅的逻辑:
从client的哈希表中删除指定channel,如果删除成功:
- 删除server端该channel中的指定client
- 如果删除完以后channel没有了订阅者,则把channel也删除
- 通知客户端
引用计数-1
int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) { dictEntry *de; list *clients; listNode *ln; int retval = 0; /* Remove the channel from the client -> channels hash table */ incrRefCount(channel); /* channel may be just a pointer to the same object we have in the hash tables. Protect it... */ /*从client中删除指定channel*/ if (dictDelete(c->pubsub_channels,channel) == DICT_OK) { retval = 1; /* 删除server端该channel中的指定client */ de = dictFind(server.pubsub_channels,channel); serverAssertWithInfo(c,NULL,de != NULL); clients = dictGetVal(de); ln = listSearchKey(clients,c); serverAssertWithInfo(c,NULL,ln != NULL); listDelNode(clients,ln); if (listLength(clients) == 0) { /* 如果删除完以后channel没有了订阅者,则把channel也删除 */ dictDelete(server.pubsub_channels,channel); } } /* 通知客户端 */ if (notify) addReplyPubsubUnsubscribed(c,channel); /*引用计数-1*/ decrRefCount(channel); /* it is finally safe to release it */ /*普通模式跟模糊模式中分别使用了哈希表跟链表两种结构进行处理,而不是统一的,原因在于模糊模式不能精确匹配, * 需要遍历挨个判断,而哈希表的优势在于快速定位查找,在需要遍历跟模糊匹配的场景中并不适用。*/ return retval; }
至此,订阅发布模式的源码主要流程就分析完了,普通模式跟模糊模式中分别使用了哈希表跟链表两种结构进行处理,而不是统一的,原因在于模糊模式不能精确匹配,需要遍历之后一个个判断,而哈希表的优势在于快速定位查找,在需要遍历跟模糊匹配的场景中并不适用。
4.java接入redis发布订阅
4.1 依赖
<parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.2.6.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
4.2 配置类
/**
* @author 二十
* @since 2022/2/9 4:12 下午
*/
@SpringBootConfiguration
public class Config {
/**
* 配置redistemplate
*
* @param redisConnectionFactory
* @return
*/
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
// 使用Jackson2JsonRedisSerialize 替换默认的jdkSerializeable序列化
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
// 设置value的序列化规则和 key的序列化规则
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
/**
* redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
*
* @param redisConnectionFactory
* @param listenerAdapter
* @return
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
// 订阅多个频道
redisMessageListenerContainer.addMessageListener(listenerAdapter, new PatternTopic("test1"));
redisMessageListenerContainer.addMessageListener(listenerAdapter, new PatternTopic("test2"));
//不同的订阅者
//redisMessageListenerContainer.addMessageListener(listenerAdapter2, new PatternTopic("test2"));
//序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)
Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
seria.setObjectMapper(objectMapper);
redisMessageListenerContainer.setTopicSerializer(seria);
return redisMessageListenerContainer;
}
/**
* 表示监听一个频道
* MessageListenerAdapter:监听适配器
* 需要指定订阅者
* 这样就配置好了这个订阅者的监听适配器
*
* @param receiveListener
* @return
*/
@Bean
public MessageListenerAdapter listenerAdapter(ReceiveListener receiveListener) {
//这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“MessageReceive1 ”
return new MessageListenerAdapter(receiveListener);
}
}
4.3 配置文件
#springBoot整合redis
#端口
spring.redis.port=6379
#host
spring.redis.host=
#连接的数据库
spring.redis.database=0
#数据库密码
spring.redis.password=
#超时时间
spring.redis.timeout=1000000
#最大连接数
spring.redis.lettuce.pool.max-active=20
#最大阻塞等待时间 负数表示没有限制
spring.redis.lettuce.pool.max-wait=-1
#最大空闲连接数
spring.redis.lettuce.pool.max-idle=5
#最小空闲连接数
spring.redis.lettuce.pool.min-idle=0
4.4 监听器
@Slf4j
@Component
public class ReceiveListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] bytes) {
log.info("接收数据:{}", message.toString());
log.info("订阅频道:{}", new String(message.getChannel()));
}
}
4.5 测试发送
/**
* @author 二十
* @since 2022/2/9 4:12 下午
*/
@SpringBootTest(classes = RedisMain.class)
@Slf4j
public class RedisListenerTest {
@Resource
private RedisTemplate<String,Object> redisTemplate;
@Test
void contextLoads() {
log.info("执行发布");
redisTemplate.convertAndSend("test1","Hello,I'm Tom!");
}
}
二,键空间事件通知
1. 配置redis键空间事件通知
redis2.8.0版本之后推出了键空间事件通知,如何使用呢?当redis的key被删除时,redis会发送两种不同类型的事件,特定的事件会往特定的频道发送通知,我们只要订阅这个特定的频道等待通知即可。
两种不同类型的事件:
PUBLISH __keyspace@0__:mykey del
PUBLISH __keyevent@0__:del mykey
以keyspace为前缀的频道被称为键空间通知(key-space notification),订阅这个频道 keyspace@0:mykey,可以接收0号数据库中所有修改键 mykey 的事件,订阅者将接收到被执行的事件的名字,就是 del;
以keyevent为前缀的频道则称为键事件通知(key-event notification),订阅这个频道 keyevent@0:del,则可以接收0号数据库中所有执行 del 命令的键,订阅者将接收到被执行事件的键的名字,就是 mykey。
我们有两种方式开启键空间事件通知功能,或者只接受特定类型的通知:
通过修改redis.conf配置文件
# 默认为空,表示不开启键空间通知功能 notify-keyspace-events ""通过CONFIG SET命令来设定notify-keyspace-events参数
# xx代表订阅事件的类型 CONFIG SET notify-keyspace-events xx当服务器开启键空间事件通知功能时,需要指定事件的类型,即开启哪些特定类型的通知。在server.h头文件中,Redis设定了一系列的宏定义,用来标识事件的类型。其中,每一个宏定义代表的事件类型如下:
| 事件代号 | 事件类型 |
|---|---|
| K | 键空间通知,所有通知以keyspace@为前缀 |
| E | 键事件通知,所有通知以keyevent@为前缀 |
| g | DEL、EXPIRE、RENAME等类型无关的通用命令 |
| $ | 字符串命令的通知 |
| l | 列表命令的通知 |
| s | 集合命令的通知 |
| h | 哈希命令的通知 |
| z | 有序集合命令的通知 |
| x | 过期事件:每当有过期键被删除时发送 |
| e | 驱逐事件:每当有键因为maxmemory政策而被删除时发送 |
| A | 参数g$lshzxe的别名,代表全部上述全部命令 |
关于notify-keyspace-events的设定,输入参数必须至少要有一个K或者E,用来标识该通知是键空间还是键事件;如果不包含,不管其余参数为什么,都将不会有任何通知被分发。
2.源码实现
键空间事件通知的源码实现仅仅只有三个函数,这三个函数声明在server.h头文件中:
/* 键空间事件通知 */
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid);
//将Notify设置参数由字符串转换成标识量flag
int keyspaceEventsStringToFlags(char *classes);
//将Notify设置参数由标识量flags转换成字符串
sds keyspaceEventsFlagsToString(int flags);
- keyspaceEventsStringToFlags 函数其实就是遍历每一个字符,做或映射。
- keyspaceEventsFlagsToString函数就是根据传过来的flag参数判断是哪种宏定义,然后在判断是键空间通知还是键事件通知【如果都不是,在sds最后拼接一个字符m】,返回结果。
- notifyKeyspaceEvent函数其实是利用Redis的订阅发布实现的键空间事件通知,当数据库中的键发生改变且服务器开启了相应的事件类型通知时,Redis就会发送键事件通知,通过pub/sub命令来告知客户端此刻数据库中的修改操作。
3.Notify应用
开启两个客户端:
- 在客户端一执行:
PSUBSCRIBE __keyevent*,这样就开始订阅了符合模式串__keyevent*的事件。 - 在客户端二执行:
CONFIG SET notify-keyspace-events KEA,设置开启键空间事件通知,然后运行SET命令,这时客户端一就可以接收到这个事件。


Java代码实现Redis键空间失效通知:整个流程其实就是提前在项目里面配置好key过期的监听器,一旦redis中的某个key失效,就可以通过监听器感知到,然后执行相应的业务逻辑。
配置类
@SpringBootConfiguration
public class RedisConfig extends CachingConfigurerSupport {
@Bean
public RedisTemplate<String, Serializable> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(LettuceConnectionFactory lettuceConnectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(lettuceConnectionFactory);
return container;
}
}
�Redis键空间失效通知监听器
@Component
@Slf4j
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] pattern) {
String token = message.toString();
log.error("token:{}", token);
}
}
测试用例
@SpringBootTest(classes = RedisMain.class)
@Slf4j
class RedisListenerTest {
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Test
void contextLoads() throws Exception {
log.info("执行发布");
redisTemplate.opsForValue().setIfAbsent("yhd-keyspace-notify", "二十", 3l, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(5);
}
}
