Redis的持久化机制
- RDB
- redis会在满足一定条件的时候,生成一个持久化的文件dump.rdb,当我们 需要恢复数据时,就会去加载这个持久化文件,把数据恢复到内存中。
- 主动生成的条件:
- shutdown 他会生成dump.rdb (不重要)
- flushAll和flushDB的其实也会生成dump.rdb(不重要)
- 手动调用save和bgsave也会生成dump.rdb
- save同步阻塞指令:当调用这个指令的时候,redis中 所有操作都会停下来,然后去持久化数据
- bgsave异步非阻塞指令:异步的在redis中fork出来一条和主进程一模一样的子进程,主进程继续响应客户端请求,子进程就去做持久化数据写入到他自己的temp.rdb文件中,子进程处理完数据再把temp中的数据输出到dump.rdb文件中,然后销毁temp.rdb文件和fork出来的子进程。(被动生成时调用的就是这个指令)
- 被动生成条件:
- after 900 sec (15 min) if at least 1 key changed
- 15分钟之内有一次操作,15后就会自动生成
- after 300 sec (5 min) if at least 10 keys changed
- 5分钟之内有10次操作,5分钟之后会自动生成
- after 60 sec if at least 10000 keys changed
- 一分钟内有一万次操作,一分钟后会自动生成
- after 900 sec (15 min) if at least 1 key changed
- 生成格式:调用bgsave异步非阻塞指令,底层是一个二进制文件
AOF
- 以日志的方式追加每一次操作,把写操作的指令全部记录下来,如果需要恢复数据,把这些指令重新执行一遍。
- aof的同步机制:缓冲区的数据什么时候刷写到磁盘?
- erver sec: 每秒刷一次
- 优点:性能高,缺点是:丢一秒数据
- always:总是
- 优点:最多丢一条 缺点:很慢
- no:交给操作系统来决定
- 优点:性能最高 缺点:完全不可控
- erver sec: 每秒刷一次
- aof文件在什么情况下进行重写(文件瘦身)
- auto-aof-rewrite-percentage 100 : 文件大小是上次重写的100%
- auto-aof-rewrite-min-size 64mb :大小必须要超过64MB
- 如何判断持久化机制在rdb和aof共存的时候先执行谁?
什么是redis缓存击穿?
- 缓存中没有但数据库中有的数据(一般是缓存时间到期),突然并发用户特别多,同时读缓存没有读到数据,又同时去数据库读,造成数据库压力瞬间增大。
解决方法?
什么是redis的缓存穿透?
- 缓存和数据库中都没有这个数据,而用户不断发起请求,比如id为-1或者id为一个不存在的数据,这时用户可能是一个攻击者,数量多了会导致数据库压力过大。
- 解决方案:
- 接口层增加校验,如用户鉴权校验,id做基础检验 ,当id<=0的直接就拦截了。
- id不是一个负数但是是一个固定的数据,把数据库不存在的数据 也写入缓存中,返回给用户null。
- 当 数据是随机数时,使用布隆过滤器来解决
布隆过滤器如何解决缓存穿透问题?
什么是redis的缓存雪崩?
- redis中的key大量过期,然后有大量的人同一时刻来访问数据,由于大量key过期导致大量的请求,其实还是会打到数据库上,从而使得数据库压力过大,出现崩溃。
解决方案?
当我们直接对数据进行修改,没有进行任何缓存处理,所以数据库中的数据其实已经变了,然而缓存中的数据还是原来的旧数据,此时就会出现 缓存和数据库中的数据不一致
解决方法:
主从的作用?
- 为了体现服务器的高可用,一主多从,实现读写分离,写的时候找主机,读的时候找从机,只要不是master宕机,其中一个salve从机宕机了,还可以从其他从机里面读数据。
- 主从的数据同步方式及其原理
- 全量同步(更新):第一次连接就是全量同步,master把所有的数据都发送给salve。
- 原理:因为我们配从不配主,当我们向salve发送主从指令之后,salve执行replicaof命令,这个命令就会携带者自己的offset和replid去请求连接master,并且请求数据同步,master接收到这个数据之后,就拿着salve传递过来的replid和他自己的id进行比较,如果不同,说明是第一次连接,master会去执行bgsave指令,异步的生成rdb二进制文件(dump.rdb),同时他还会开辟一个缓冲区(repl_baklog),把新来的数据写入到缓冲区里边去,并且写入之后offset会依次递增;当rdb文件生成好,master就会把这个文件发送给salve,从机就会清空自己的dump.rdb文件,然后执行master发送过来的dump.rdb文件内容,进行数据恢复;salve恢复完后master再把缓存区中的数据直接发送给salve,此时主从之间数据就同步完成了。
- 全量同步(更新):第一次连接就是全量同步,master把所有的数据都发送给salve。
- 增量同步(更新):全量同步完成后,后面只要salve不出现长时间卡顿或者长时间重启不成功,就一直是增量同步。
- 原理:当salve重启了之后,再次去尝试连接master,此时还是会把 replid和offset提交给master,然后master再次判断是不是第一次连接,不是,就进行增量更新,他会通过salve提交过来的offset判断repl_baklog中的数据同步到哪个位置了, 然后把offset后面的指令发送给salve,salve就去执行这些指令。当然,如果卡顿或者重启间隔的时间太长了,超过了master中缓存区的一定范围,前面的未同步的数据就会被覆盖,此时,当salve重启连接上了master,此时就需要再次发起全量更新。
Redis的哨兵原理
哨兵的作用和原理?
- redis配置主从之后,只要salve不全部宕机,都可以读取到数据,但是当master宕机了怎么办,数据没法写了,从机也没法同步数据了。哨兵的作用就是为了解决master宕机问题。哨兵一般有很多个而且是奇数个,这些哨兵就和master和salve建立起来连接,然后其中一个哨兵每隔一秒就会给master发送一个指令,master就会回复sentinel指令,如果sentinel没有收到master响应的指令,此时这个哨兵就认为master是主观下线(-sdown),他就会通知其他的哨兵给master发指令,如果超过一半以上的哨兵没有收到响应,此时sentinel就会认为master是客观下线(-odown),这个时候哨兵们就会选举一个哨兵出来从salve中选举一个master。
主从+哨兵
cluster模式
Redis的脑裂是什么?
- 如果当前主库突然暂时性的掉线了,而不是真的故障了,此时哨兵启动了主从切换机制,当假故障的主库恢复后,又开始处理请求了,但是现在哨兵又已经选出来新的主库了, 现在新的主库和旧的主库都存在,这就是脑裂现象。
- 脑裂有什么影响?
- 客户端不知道应该往哪个主节点上写数据,然后不同的客户端就往不同的主节点上写数据,等到哨兵让原主库变成新主库的从库,进行全量更新时,原主库中的数据就会丢失。
- 如何解决脑裂问题?
- Redis中有两个关键的配置项可以解决这个问题
- min-slaves-to-write(最小从服务器数)
- 设置最小从服务数可以保证当主库没有连接到足够多的从库时,主库无法正确写入,以此来避免数据丢失
- min-slaves-max-lag(从连接的最大延迟时间)
- 配置从库和主库进行数据复制时的ACK消息延迟的最大时间,如果主库在和从库进行数据复制时消息延迟时间超出了还没有获得从库的响应,主库就不再接收客户端的请求了,这样就只会丢失一小点数据。
- min-slaves-to-write(最小从服务器数)
- Redis中有两个关键的配置项可以解决这个问题
数据丢失就一定是脑裂吗?
redis的事务是弱事务,当我们multi打开事务,他就会把我们要执行的命令放入到queue中,只有当我们exec提交事务的时候,才进行序列化,串行化的一次性执行这些指令,中间不会插入其他命令。他的执行过程并没有保证原子性,当他执行的指令有逻辑问题的时候,其实他并不知道,只有当提交事务的时候去串行化执行才会出错,但是这条指令出错并不影响其他指令的执行。但是当出现语法错误时,queue中的监视器就已经检测到了。他就会把queue中的命令全部清除了,提交事务的时候其实里面已经没有命令了。
watch key:监视一个或者,多个key,如果开启事务后在提交事务之前这个key被其他命令所更改,那么事务将会被打断(乐观锁的思想)。
Redis的过期策略
Redis的过期策略分为两种
当redis的内存达到maxmemory的时候,redis就会选择一些方案来移除内存中的一部分数据,maxmemory并不是内存的最大值,而是我们设定的一个值, 一般大小不要超过总内存的80%。
- 常见方案:
- noeviction:继续读,停止写。这样做能够保证不丢失数据,但是会使得线上的业务无法继续进行。这是redis的默认淘汰策略。
- volatile-lru:尝试淘汰设置了过期时间的key,再根据上一次访问的时间把距离上次访问时间更久的删除。
- allkeys-lru:区别于volatile-lru,这个策略是对全体key集合进行LRU策略淘汰,而不单单只是过期key集合。
- volatile-lfu:即最少使用的key优先被淘汰,也就是优先淘汰最近最少使用的key。这样做可以保证需要持久化的数据不会突然丢失。
- allkeys-lfu:
- volatile-ttl:尝试淘汰设置了过期时间的key,只是淘汰的策略变为比较key的剩余寿命ttl的值,ttl越小越优先被淘汰。也就是优先淘汰快消亡的key。
- volatile-random:尝试淘汰设置了过期时间的key,只是淘汰的策略变为随机淘汰,即淘汰过期key集合中随机的key。
- allkeys-random:作用于全体key集合的随机淘汰。
Redis的底层IO多路复用模型
- 常见方案:
redis底层IO多路复用模型
- redis底层把任何的操作都看成一个事件,一台服务器访问另一台服务器建立socket连接就是一个连接事件。当有多个请求想要和redis建立连接,redis就会分配一条线程去采用轮询的方式监听所有来的事件(NIO多路复用模型),每一个事件到达服务器端不会立即被执行,而是全部进入一个队列中,串行化的通过文件分发器调用相应的处理器,比如连接事件就会调用连接处理器去新建一个和原有socket配对的socket。redis建立起来的连接一般是长连接,后续不会再产生性能影响。连接建立后,再发送其他事件,比如响应事件,再通过队列进入事件分发器调用响应处理器处理事件,进入redisCinet中,然后再进入输入缓冲区找到对应的redisCommnd解析器中解析指令,进入内存空间获取到值再通过输出缓冲区返回给redisCinet,再通过socket返回给客户端服务器。
- redis并发高的原因
- Redis是纯内存数据库,一般都是简单的存取操作,线程占用的时间很多,时间的花费主要集中在IO上,所以读取速度快。
- 再说一下IO,Redis使用的是非阻塞IO,IO多路复用,使用了单线程来轮询描述符,将数据库的开、关、读、写都转换成了事件,减少了线程切换时上下文的切换和竞争。( 多路”指的是多个网络连接,“复用”指的是复用同一个线程。采用多路 I/O 复用技术可以让单个线程高效的处理多个连接请求(尽量减少网络IO的时间消耗),且Redis在内存中操作数据的速度非常快(内存内的操作不会成为这里的性能瓶颈),主要以上两点造就了Redis具有很高的吞吐量。)
- Redis采用了单线程的模型,保证了每个操作的原子性,也减少了线程的上下文切换和竞争。
- 另外,数据结构也帮了不少忙,Redis全程使用hash结构,读取速度快,还有一些特殊的数据结构,对数据存储进行了优化,如压缩表,对短数据进行压缩存储,再如,跳表,使用有序的数据结构加快读取的速度。
- 还有一点,Redis采用自己实现的事件分离器,效率比较高,内部采用非阻塞的执行方式,吞吐能力比较大。
redis执行慢和使用多线程的场景
工具代码 ```java public class RedisLock { RedissonClient redissonClient; public RedisLock(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
/**
- 互斥锁,seconds秒后自动失效
- @param key
@param seconds */ public boolean lock(String key, int seconds) { // 获得我们的锁对象 RLock rLock = redissonClient.getLock(key);
if (rLock.isLocked()) {
//如果有人正在持有这把锁,则直接失败
return false;
} //大量线程只有一部分可以到达这里进行自旋 rLock.lock(seconds, TimeUnit.SECONDS); return true; }
/**
- 互斥锁,自动续期 *
@param key */ public boolean lock(String key) {
//如果说有人持有锁,其他人来执行,这个时候,会直接返回false //如果说真的有大量的请求的 同一时刻就真的完完全全的同一时刻 RLock rLock = redissonClient.getLock(key); //表示当前这把锁是否被人持有着 if (rLock.isLocked()) {
return false;
} //才是我们之前用的那个自旋的锁方法 // 大量线程中,可能会有少量的线程来同时走到这个方法 rLock.lock(); return true; }
public boolean tryLock(String key, long timeout) throws InterruptedException {
RLock rLock = redissonClient.getLock(key);
return rLock.tryLock(timeout, TimeUnit.MILLISECONDS);
}
/**
* 手动释放锁
*
* @param key
*/
public void unlock(String key) {
RLock rLock = redissonClient.getLock(key);
if (rLock.isLocked()) {
rLock.unlock();
}
}
}
```java
@Component
public class RedisCache {
private RedisTemplate redisTemplate;
public RedisCache(RedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
/**
* 缓存存储
*
* @param key
* @param value
* @param seconds
* @return void
* @author xc
*/
public void set(String key, String value, int seconds){
ValueOperations<String,String> vo = redisTemplate.opsForValue();
if(seconds > 0){
vo.set(key, value, seconds, TimeUnit.SECONDS);
}else{
vo.set(key, value);
}
}
/**
* 缓存获取
*
* @param key
* @return java.lang.String
* @author xc
*/
public String get(String key){
ValueOperations<String,String> vo = redisTemplate.opsForValue();
return vo.get(key);
}
/**
* 缓存手动失效
*
* @param key
* @return boolean
* @author xc
*/
public boolean delete(String key){
return redisTemplate.delete(key);
}
/**
* 缓存存储并设置过期时间
*
* @param key
* @param value
* @param time
* @return void
* @author xc
*/
public void setex(String key, String value, long time) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
}
/**
* 缓存批量获取
*
* @param keyList
* @return java.util.List
* @author xc
*/
public List mget(List<String> keyList) {
return redisTemplate.opsForValue().multiGet(keyList);
}
/**
* 删除key下的多个值
*
* @param key
* @param values
* @return void
* @author xc
*/
public void srem(String key, String[] values) {
redisTemplate.opsForSet().remove(key, values);
}
/**
* 删除key下的多个值
*
* @param key
* @param values
* @return void
* @author xc
*/
public void sadd(String key, String[] values) {
redisTemplate.opsForSet().add(key, values);
}
/**
* 缓存成员获取
*
* @param key
* @return java.util.Set
* @author xc
*/
public Set smembers(String key) {
return redisTemplate.opsForSet().members(key);
}
/**
* 缓存成员是否存在
*
* @param key
* @param member
* @return java.lang.Boolean
* @author xc
*/
public Boolean sismember(String key, String member) {
return redisTemplate.opsForSet().isMember(key, member);
}
/**
* 缓存有序区间值
* @param key
* @param min
* @param max
* @param offset
* @param count
* @return java.util.Set
* @author xc
*/
public Set zrangeByScore(String key, double min, double max, long offset, long count) {
return redisTemplate.opsForZSet().rangeByScore(key, min, max, offset, count);
}
/**
* 缓存有序区间值
*
* @param key
* @param min
* @param max
* @return java.util.Set
* @author xc
*/
public Set zrangeByScore2(String key, double min, double max) {
return redisTemplate.opsForZSet().rangeByScore(key, min, max);
}
/**
* 倒序返回zset区间值
* @param key
* @param start
* @param end
* @return
*/
public Set zrevrange(String key, long start, long end) {
return redisTemplate.opsForZSet().reverseRange(key, start, end);
}
/**
* 缓存倒序排列指定区间值
*
* @param key
* @param min
* @param max
* @param offset
* @param count
* @return java.util.Set
* @author xc
*/
public Set zrevrangeByScore(String key, double min, double max, long offset, long count) {
return redisTemplate.opsForZSet().reverseRangeByScore(key, min, max, offset, count);
}
/**
* 缓存有序存储
*
* @param key
* @param member
* @param score
* @return java.lang.Boolean
* @author xc
*/
public Boolean zadd(String key, String member, double score) {
return redisTemplate.opsForZSet().add(key, member, score);
}
/**
* 缓存有序存储
*
* @param key
* @param values
* @return java.lang.Long
* @author xc
*/
public Long zremove(String key, String... values) {
return redisTemplate.opsForZSet().remove(key, values);
}
/**
* 缓存有序数量
*
* @param key
* @return java.lang.Long
* @author xc
*/
public Long zcard(String key) {
return redisTemplate.opsForZSet().zCard(key);
}
/**
* 判断hash key是否存在
*
* @param key
* @return
*/
public boolean hExists(String key) {
return hGetAll(key).isEmpty();
}
/**
* 判断hash field是否存在
* @return
*/
public boolean hFieldExists(String key, String field) {
return redisTemplate.opsForHash().hasKey(key, field);
}
/**
* 获取hash变量中的键值对
* 对应redis hgetall 命令
*
* @param key
* @return
*/
public Map<String, String> hGetAll(String key) {
return redisTemplate.opsForHash().entries(key);
}
/**
* 获取hash变量中的
* @return
*/
public Object hGet(String key, String field) {
return redisTemplate.opsForHash().get(key, field);
}
/**
* hash变量中field对应的value自增
* @param key
* @param field
* @param increment
* @return
*/
public Long hIncr(String key, String field, Integer increment) {
return redisTemplate.opsForHash().increment(key, field, increment);
}
/**
* hash变量field对应的value自增
* @param key
* @param field
* @return
*/
public Long hIncr(String key, String field) {
return hIncr(key, field, 1);
}
/**
* 获取hash变量中的field数量
* 对应redis hlen 命令
*
* @param key
* @return
*/
public Long hLen(String key) {
return redisTemplate.opsForHash().size(key);
}
/**
* 添加hash的value
* @param key
* @param field
* @param value
*/
public void hPut(String key, String field, Object value) {
redisTemplate.opsForHash().put(key, field, value);
}
/**
* 以map集合的形式添加hash键值对
*
* @param key
* @param map
*/
public void hPutAll(String key, Map<String, String> map) {
redisTemplate.opsForHash().putAll(key, map);
}
/**
* 删除某个field
* @param key
* @param field
*/
public long hDel(String key, String field) {
return redisTemplate.opsForHash().delete(key, field);
}
/**
* 设置key过期时间
* @param key
* @param time
* @param unit
*/
public void expire(String key, long time, TimeUnit unit) {
redisTemplate.expire(key, time, unit);
}
/**
* 以list集合的形式添加数据
*
* @param key
* @return
*/
public Boolean hasKey(String key) {
return redisTemplate.hasKey(key);
}
/**
* 以list集合的形式添加数据
*
* @param key
* @param values
* @return
*/
public Long lPushAll(String key, String... values) {
return redisTemplate.opsForList().leftPushAll(key, values);
}
/**
* 以list集合的形式添加数据
*
* @param key
* @param values
* @return
*/
public Long lPushAll(String key, List<String> values) {
return redisTemplate.opsForList().leftPushAll(key, values);
}
/**
* 以list集合的形式添加数据
*
* @param key
* @param values
* @return
*/
public Long rPushAll(String key, String... values) {
return redisTemplate.opsForList().rightPushAll(key, values);
}
/**
* 以list集合的形式添加数据
*
* @param key
* @param values
* @return
*/
public Long rPushAll(String key, List<String> values) {
return redisTemplate.opsForList().rightPushAll(key, values);
}
/**
* 返回list集合下表区间的元素
*
* @param key
* @param start
* @param end
* @return
*/
public List<String> lRange(String key, long start, long end) {
return redisTemplate.opsForList().range(key, start, end);
}
/**
* 返回list集合的大小
*
* @param key
* @return
*/
public Long lsize(String key) {
return redisTemplate.opsForList().size(key);
}
/**
* 设置缓存过期时间
*
* @param key
* @return
*/
public void expire(String key, long time) {
this.expire(key, time, TimeUnit.SECONDS);
}
/**
* 执行lua脚本
*
* @param script
* @param keys
* @param args
* @param <T>
* @return
*/
public <T> T execute(RedisScript<T> script, List<String> keys, String... args) {
return (T) redisTemplate.execute(script, keys, args);
}
/**
* 缓存存储并设置过期时间
*
* @param key
* @param value
* @param time
* @param timeUnit
* @return void
*/
public void setex(String key, String value, long time, TimeUnit timeUnit) {
redisTemplate.opsForValue().set(key, value, time, timeUnit);
}
/**
* 缓存增量
*
* @param key
* @param increment
* @return java.lang.Long
*/
public Long increment(String key, long increment) {
return redisTemplate.opsForValue().increment(key, increment);
}
/**
* 缓存失效时间
*
* @param key
* @param timeUnit
* @return java.lang.Long
*/
public Long getExpire(String key, TimeUnit timeUnit) {
return redisTemplate.getExpire(key, timeUnit);
}
/**
* 指定缓存失效时间
*
* @param key
* @param date
* @return java.lang.Boolean
*/
public Boolean expireAt(String key, Date date) {
return redisTemplate.expireAt(key, date);
}
}
/**
* redis key 常量
* @author xc
*/
public class RedisKeyConstants {
/**
* 用户信息锁修改锁前缀
*/
public static final String USER_UPDATE_LOCK_PREFIX = "user_update_lock:";
/**
* 用户锁信息锁前缀
*/
public static final String USER_LOCK_PREFIX = "user_info_lock:";
/**
* 用户详情前缀
*/
public static final String USER_INFO_PREFIX = "user_info_prefix:";
}
- 初始代码
```java
/
根据id查询
/
@Override
public ApUserRealname findById(Integer apUserId) {
} / 更新修改 / @Override public void update(ApUserRealname apUserRealname) {String userInfoKey = RedisKeyConstants.USER_INFO_PREFIX + apUserId;
String jsonUser = stringRedisTemplate.opsForValue().get(userInfoKey);
if(jsonUser == null){
ApUserRealname apUserRealname = apRealMapper.selectById(apUserId);
String redisJson = JSON.toJSONString(apUserRealname);
//将数据放入到redis中
stringRedisTemplate.opsForValue().set(userInfoKey,redisJson);
//返回从数据库中查询到的数据
return apUserRealname;
}
ApUserRealname apUserRealname = JsonUtil.json2Object(jsonUser, ApUserRealname.class);
return apUserRealname;
}apRealMapper.updateById(apUserRealname);
- 优化一:避免大量key同一时间过期,缓存雪崩问题(惊群问题)
```java
@Override
public void update(ApUserRealname apUserRealname) {
String userInfoKey = RedisKeyConstants.USER_INFO_PREFIX + apUserRealname.getUserId();
apRealMapper.updateById(apUserRealname);
//当把数据进行修改之后,我们需要把数据保存到redis里边去
// 为了防止缓存雪崩,也就是缓存的集群效应,我们需要使用固定时间 + 随机时间
redisCache.set(userInfoKey,JsonUtil.object2Json(apUserRealname),CacheSupport.generateCacheExpireSecond());
}
优化二:防止幂等性问题(使用优化过后的自旋锁,企业级开发几乎不会使用无脑的自旋锁,因为性能)
@Override
public void update(ApUserRealname apUserRealname) {
//如果出现了网络卡顿,可能会导致一个请求,就可能导致一个请求发起了多次,导致这个操作进行了多次,修改是这个样子,保存其实也是这个样子
String redisLockKey = RedisKeyConstants.USER_INFO_PREFIX + apUserRealname.getUserId();
if(!redisLock.lock(redisLockKey)){
//如果是false,就是触发幂等操作了
throw new RuntimeException("请不要幂等操作");
}
try{
redisCache.set(userInfoKey,JsonUtil.object2Json(apUserRealname),CacheSupport.generateCacheExpireSecond());
}finally {
//释放锁
redisLock.unlock(userInfoKey);
}
}
优化三:解决缓存穿透问题(并且设置了冷热数据过期,就是冷数据过期,热数据续期 )
@Override
public ApUserRealname findById(Integer apUserId) {
String userInfoKey = RedisKeyConstants.USER_INFO_PREFIX + apUserId;
//有没有一种可能性,这个代码是一个缓存穿透代码
String jsonUser = stringRedisTemplate.opsForValue().get(userInfoKey);
if (StringUtils.hasLength(jsonUser)) // 有对象 {}
//说明是有内容的,但是这个内容可能是一个穿透代码
if(Objects.equals(CacheSupport.EMPTY_CACHE,jsonUser)){
//说明是一个穿透代码,直接给他返回一个空对象即可
return new ApUserRealname();
}
//走到这里说明他并非是一个空对象,是确实有数据,那么我应该给当前这个数据进行续期
redisCache.expire(userInfoKey,CacheSupport.generateCacheExpireSecond());
//把他转换成json返回
return JsonUtil.json2Object(jsonUser,ApUserRealname.class);
}
//说明redis中没有数据
ApUserRealname apUserRealname = apRealMapper.selectById(apUserId);
if(apUserRealname == null){
//如果说数据库是个空,那么需要和之前的数据对应起来
redisCache.set(userInfoKey,CacheSupport.EMPTY_CACHE,CacheSupport.generateCacheExpireSecond());
return new ApUserRealname();
}
//走到这儿就说明 redis中没有,db里边有,然后再设置一个过期时间
redisCache.set(userInfoKey,JsonUtil.object2Json(apUserRealname),CacheSupport.generateCacheExpireSecond());
return apUserRealname;
}
优化四:抽取方法 ```java @Override public ApUserRealname findById(Integer apUserId) {
String userInfoKey = RedisKeyConstants.USER_INFO_PREFIX + apUserId; ApUserRealname apUserRealname = readDataFromRedis(userInfoKey); if(!Objects.isNull(apUserRealname)){
// redis中存在
return apUserRealname;
}
return getDataFromDb(userInfoKey,apUserId); }
private ApUserRealname getDataFromDb(String userInfoKey ,Integer apUserId) {
ApUserRealname apUserRealname = apRealMapper.selectById(apUserId);
if(apUserRealname == null){
//如果说数据库是个空,那么需要和之前的数据对应起来
redisCache.set(userInfoKey,CacheSupport.EMPTY_CACHE,CacheSupport.generateCacheExpireSecond());
return new ApUserRealname();
}
//走到这儿就说明 redis中没有,db里边有,然后再设置一个过期时间
redisCache.set(userInfoKey,JsonUtil.object2Json(apUserRealname),CacheSupport.generateCacheExpireSecond());
return apUserRealname;
}
- 优化 五:解决缓存击穿问题
```java
private ApUserRealname getDataFromDb(String userInfoKey ,Integer apUserId) {
//要避免缓存击穿代码,热点key过期,大量线程涌入,直击数据库,添加分布式锁,但是不能添加自旋锁,避免一次自旋次数太多。
String redisLockKey = RedisKeyConstants.USER_LOCK_PREFIX + apUserId;
boolean lock = redisLock.lock(redisLockKey);
if(!lock){
//只有少部分可以进入到自旋中, 大部分人直接失败
throw new RuntimeException("请稍后再查");
}
try{
ApUserRealname apUserRealname = apRealMapper.selectById(apUserId);
if(apUserRealname == null){
//如果说数据库是个空,那么需要和之前的数据对应起来
redisCache.set(userInfoKey,CacheSupport.EMPTY_CACHE,CacheSupport.generateCacheExpireSecond());
return new ApUserRealname();
}
//走到这儿就说明 redis中没有,db里边有,然后再设置一个过期时间
redisCache.set(userInfoKey,JsonUtil.object2Json(apUserRealname),CacheSupport.generateCacheExpireSecond());
return apUserRealname;
}finally {
redisLock.unlock(userInfoKey);
}
}
- 优化六:解决双写一致性问题,更新与查询用同一把锁
- 优化七:doublecheck + tryLock 进行数据优化
private ApUserRealname getDataFromDb(String userInfoKey, Integer apUserId) {
//要避免缓存击穿代码,热点key过期,大量线程涌入,直击数据库,添加分布式锁,但是不能添加自旋锁,避免一次自旋次数太多。
String redisLockKey = RedisKeyConstants.USER_LOCK_PREFIX + apUserId;
boolean lock = false;
try {
//大家都尝试去抢锁,但是只能有一个人抢锁成功,如果在规定的时间之内,没有抢锁成功,那么就就会直接返回false
//在1s的时间内,其实当前这个方法串行化是完全可以执行成功的,因为毕竟此时数据库的压力不大,
lock = redisLock.tryLock(redisLockKey, 1);
} catch (InterruptedException e) {
ApUserRealname apUserRealname = readDataFromRedis(userInfoKey);
if (apUserRealname != null) {
return apUserRealname;
}
e.printStackTrace();
}
if (!lock) {
//只有少部分可以进入到自旋中, 大部分人直接失败\
ApUserRealname apUserRealname = readDataFromRedis(userInfoKey);
if (apUserRealname != null) {
return apUserRealname;
}
throw new RuntimeException("请稍后再查");
}
try {
ApUserRealname redisData = readDataFromRedis(userInfoKey);
if(redisData != null){
return redisData;
}
//线程a,他就去访问这个数据库,他读取到了一个旧数据,然后这个时候,有人去修改了数据
ApUserRealname apUserRealname = apRealMapper.selectById(apUserId);
if (apUserRealname == null) {
//如果说数据库是个空,那么需要和之前的数据对应起来
redisCache.set(userInfoKey, CacheSupport.EMPTY_CACHE, CacheSupport.generateCacheExpireSecond());
return new ApUserRealname();
}
//走到这儿就说明 redis中没有,db里边有,然后再设置一个过期时间、
// 此时下边这个线程反应过来了,他接着向缓存中写入数据,旧的数据就覆盖了新的数据,这就是db+缓存双写问题
redisCache.set(userInfoKey, JsonUtil.object2Json(apUserRealname), CacheSupport.generateCacheExpireSecond());
return apUserRealname;
} finally {
redisLock.unlock(userInfoKey);
}