Redis的持久化机制

  • RDB
    • redis会在满足一定条件的时候,生成一个持久化的文件dump.rdb,当我们 需要恢复数据时,就会去加载这个持久化文件,把数据恢复到内存中。
    • 主动生成的条件:
      • shutdown 他会生成dump.rdb (不重要)
      • flushAll和flushDB的其实也会生成dump.rdb(不重要)
      • 手动调用save和bgsave也会生成dump.rdb
    • save同步阻塞指令:当调用这个指令的时候,redis中 所有操作都会停下来,然后去持久化数据
    • bgsave异步非阻塞指令:异步的在redis中fork出来一条和主进程一模一样的子进程,主进程继续响应客户端请求,子进程就去做持久化数据写入到他自己的temp.rdb文件中,子进程处理完数据再把temp中的数据输出到dump.rdb文件中,然后销毁temp.rdb文件和fork出来的子进程。(被动生成时调用的就是这个指令)
    • 被动生成条件:
      • after 900 sec (15 min) if at least 1 key changed
        • 15分钟之内有一次操作,15后就会自动生成
      • after 300 sec (5 min) if at least 10 keys changed
        • 5分钟之内有10次操作,5分钟之后会自动生成
      • after 60 sec if at least 10000 keys changed
        • 一分钟内有一万次操作,一分钟后会自动生成
    • 生成格式:调用bgsave异步非阻塞指令,底层是一个二进制文件
  • AOF

    • 以日志的方式追加每一次操作,把写操作的指令全部记录下来,如果需要恢复数据,把这些指令重新执行一遍。
    • aof的同步机制:缓冲区的数据什么时候刷写到磁盘?
      • erver sec: 每秒刷一次
        • 优点:性能高,缺点是:丢一秒数据
      • always:总是
        • 优点:最多丢一条 缺点:很慢
      • no:交给操作系统来决定
        • 优点:性能最高 缺点:完全不可控
    • aof文件在什么情况下进行重写(文件瘦身)
      • auto-aof-rewrite-percentage 100 : 文件大小是上次重写的100%
      • auto-aof-rewrite-min-size 64mb :大小必须要超过64MB
    • 如何判断持久化机制在rdb和aof共存的时候先执行谁?
      • 把rdb或者aof中的其中一个文件损坏,重启redis看能不能重启成功。rdb坏,能重启,先执行aof;反之先执行rdb。

        redis的缓存击穿

  • 什么是redis缓存击穿?

    • 缓存中没有但数据库中有的数据(一般是缓存时间到期),突然并发用户特别多,同时读缓存没有读到数据,又同时去数据库读,造成数据库压力瞬间增大。
  • 解决方法?

    • 热数据永不过期(缺点:可能有突然火起来的数据)
    • 加一把互斥锁

      redis的缓存穿透

  • 什么是redis的缓存穿透?

    • 缓存和数据库中都没有这个数据,而用户不断发起请求,比如id为-1或者id为一个不存在的数据,这时用户可能是一个攻击者,数量多了会导致数据库压力过大。
  • 解决方案:
    • 接口层增加校验,如用户鉴权校验,id做基础检验 ,当id<=0的直接就拦截了。
    • id不是一个负数但是是一个固定的数据,把数据库不存在的数据 也写入缓存中,返回给用户null。
    • 当 数据是随机数时,使用布隆过滤器来解决
  • 布隆过滤器如何解决缓存穿透问题?

    • 将存在的值建立一个高效的检索(集合)。每次缓存取值时,先走一次判空检索。他本质上是一个二进制数组,对每一个存入集合的数据进行三次哈希,确定三次哈希的位置,设置为 1,如果在 Hash 后,原始位它是 0 的话,将其从 0 变为 1;如果本身这一位就是 1 的话,则保持不变(哈希冲突)。
    • 当查询一个数据时,通过哈希函数计算哈希值,判断计算的三次哈希值对应的位置的值,三个值中,只要有一个不是1,那么我们认为数据是不存在的。
    • 布隆过滤器只能精准判断这个数据不存在的情况,对于存在的数据可能会误判。

      redis的缓存雪崩

  • 什么是redis的缓存雪崩?

    • redis中的key大量过期,然后有大量的人同一时刻来访问数据,由于大量key过期导致大量的请求,其实还是会打到数据库上,从而使得数据库压力过大,出现崩溃。
  • 解决方案?

    • 让数据库key的过期变成随机的,一般是固定时间+随机时间

      双写一致性问题

  • 当我们直接对数据进行修改,没有进行任何缓存处理,所以数据库中的数据其实已经变了,然而缓存中的数据还是原来的旧数据,此时就会出现 缓存和数据库中的数据不一致

  • 解决方法:

    • 删除缓存,重新加载
    • 加分布式锁

      Redis的主从原理

  • 主从的作用?

    • 为了体现服务器的高可用,一主多从,实现读写分离,写的时候找主机,读的时候找从机,只要不是master宕机,其中一个salve从机宕机了,还可以从其他从机里面读数据。
  • 主从的数据同步方式及其原理
    • 全量同步(更新):第一次连接就是全量同步,master把所有的数据都发送给salve。
      • 原理:因为我们配从不配主,当我们向salve发送主从指令之后,salve执行replicaof命令,这个命令就会携带者自己的offset和replid去请求连接master,并且请求数据同步,master接收到这个数据之后,就拿着salve传递过来的replid和他自己的id进行比较,如果不同,说明是第一次连接,master会去执行bgsave指令,异步的生成rdb二进制文件(dump.rdb),同时他还会开辟一个缓冲区(repl_baklog),把新来的数据写入到缓冲区里边去,并且写入之后offset会依次递增;当rdb文件生成好,master就会把这个文件发送给salve,从机就会清空自己的dump.rdb文件,然后执行master发送过来的dump.rdb文件内容,进行数据恢复;salve恢复完后master再把缓存区中的数据直接发送给salve,此时主从之间数据就同步完成了。

image.png

  • 增量同步(更新):全量同步完成后,后面只要salve不出现长时间卡顿或者长时间重启不成功,就一直是增量同步。
    • 原理:当salve重启了之后,再次去尝试连接master,此时还是会把 replid和offset提交给master,然后master再次判断是不是第一次连接,不是,就进行增量更新,他会通过salve提交过来的offset判断repl_baklog中的数据同步到哪个位置了, 然后把offset后面的指令发送给salve,salve就去执行这些指令。当然,如果卡顿或者重启间隔的时间太长了,超过了master中缓存区的一定范围,前面的未同步的数据就会被覆盖,此时,当salve重启连接上了master,此时就需要再次发起全量更新。

image.png

Redis的哨兵原理

  • 哨兵的作用和原理?

    • redis配置主从之后,只要salve不全部宕机,都可以读取到数据,但是当master宕机了怎么办,数据没法写了,从机也没法同步数据了。哨兵的作用就是为了解决master宕机问题。哨兵一般有很多个而且是奇数个,这些哨兵就和master和salve建立起来连接,然后其中一个哨兵每隔一秒就会给master发送一个指令,master就会回复sentinel指令,如果sentinel没有收到master响应的指令,此时这个哨兵就认为master是主观下线(-sdown),他就会通知其他的哨兵给master发指令,如果超过一半以上的哨兵没有收到响应,此时sentinel就会认为master是客观下线(-odown),这个时候哨兵们就会选举一个哨兵出来从salve中选举一个master。
      • 首先会排除长期不在线的 salve
      • 然后比较salve-prority优先级大小,除0以外,值越小优先级越高
      • 再比salve中offset的大小,越大表示他的数据越新,优先级就越高
      • 最后看服务器id,越小优先级越高。

        redis的集群方案

  • 主从+哨兵

  • cluster模式

    • cluster 模式是 将所有的数据划分为16384的槽位 ,分散到多个节点上,在配置集群的时候指定每个节点分配多少个槽位,当客户端要插入或者拿数据时,先拿着要操作的数据的key,进行CRC16算法,计算出来这个key在哪个槽位上,通过槽位找到具体的节点拿数据或者写数据。cluster 模式也有主从之分,主节点宕机了,从节点就变成主节点,防止数据丢失。

      Redis的集群脑裂

  • Redis的脑裂是什么?

    • 如果当前主库突然暂时性的掉线了,而不是真的故障了,此时哨兵启动了主从切换机制,当假故障的主库恢复后,又开始处理请求了,但是现在哨兵又已经选出来新的主库了, 现在新的主库和旧的主库都存在,这就是脑裂现象。
  • 脑裂有什么影响?
    • 客户端不知道应该往哪个主节点上写数据,然后不同的客户端就往不同的主节点上写数据,等到哨兵让原主库变成新主库的从库,进行全量更新时,原主库中的数据就会丢失。
  • 如何解决脑裂问题?
    • Redis中有两个关键的配置项可以解决这个问题
      • min-slaves-to-write(最小从服务器数)
        • 设置最小从服务数可以保证当主库没有连接到足够多的从库时,主库无法正确写入,以此来避免数据丢失
      • min-slaves-max-lag(从连接的最大延迟时间)
        • 配置从库和主库进行数据复制时的ACK消息延迟的最大时间,如果主库在和从库进行数据复制时消息延迟时间超出了还没有获得从库的响应,主库就不再接收客户端的请求了,这样就只会丢失一小点数据。
  • 数据丢失就一定是脑裂吗?

    • 不一定,异步的同步 数据时如果主库子进程异步的把自己的rdb发送给从库,然后主进程继续接收数据写到缓存区中,从库还没有同步完主库就出故障了,此时缓存区中的数据就丢失了。可以通过复制进度offset来判断出此时的数据丢失是数据同步未完成导致的。

      Redis的事务

  • redis的事务是弱事务,当我们multi打开事务,他就会把我们要执行的命令放入到queue中,只有当我们exec提交事务的时候,才进行序列化,串行化的一次性执行这些指令,中间不会插入其他命令。他的执行过程并没有保证原子性,当他执行的指令有逻辑问题的时候,其实他并不知道,只有当提交事务的时候去串行化执行才会出错,但是这条指令出错并不影响其他指令的执行。但是当出现语法错误时,queue中的监视器就已经检测到了。他就会把queue中的命令全部清除了,提交事务的时候其实里面已经没有命令了。

  • watch key:监视一个或者,多个key,如果开启事务后在提交事务之前这个key被其他命令所更改,那么事务将会被打断(乐观锁的思想)。

    Redis的过期策略

  • Redis的过期策略分为两种

    • 惰性删除
      • 当数据到期之后,其实这个时候redis并不知道这个数据已经到期了,只有当去拿这个数据的时候,他才回去校验一下这个数据过期没有,如果过期了,会先把这个数据删除,并且返回null。
    • 定期删除
      • redis会定期进行一次抽查,首先会指定一个样本数,然后去遍历对应的16个库,抽取一些样本进行检查,如果抽查的样本有25%以上过期了, 他就再抽查一次这个库,没有超过就遍历下一个库。

        Redis的淘汰机制

  • 当redis的内存达到maxmemory的时候,redis就会选择一些方案来移除内存中的一部分数据,maxmemory并不是内存的最大值,而是我们设定的一个值, 一般大小不要超过总内存的80%。

    • 常见方案:
      • noeviction:继续读,停止写。这样做能够保证不丢失数据,但是会使得线上的业务无法继续进行。这是redis的默认淘汰策略。
      • volatile-lru:尝试淘汰设置了过期时间的key,再根据上一次访问的时间把距离上次访问时间更久的删除。
      • allkeys-lru:区别于volatile-lru,这个策略是对全体key集合进行LRU策略淘汰,而不单单只是过期key集合。
      • volatile-lfu:即最少使用的key优先被淘汰,也就是优先淘汰最近最少使用的key。这样做可以保证需要持久化的数据不会突然丢失。
      • allkeys-lfu:
      • volatile-ttl:尝试淘汰设置了过期时间的key,只是淘汰的策略变为比较key的剩余寿命ttl的值,ttl越小越优先被淘汰。也就是优先淘汰快消亡的key。
      • volatile-random:尝试淘汰设置了过期时间的key,只是淘汰的策略变为随机淘汰,即淘汰过期key集合中随机的key。
      • allkeys-random:作用于全体key集合的随机淘汰。

        Redis的底层IO多路复用模型

        c313b68dfe15ef766be85b1d69a781e.jpg
  • redis底层IO多路复用模型

    • redis底层把任何的操作都看成一个事件,一台服务器访问另一台服务器建立socket连接就是一个连接事件。当有多个请求想要和redis建立连接,redis就会分配一条线程去采用轮询的方式监听所有来的事件(NIO多路复用模型),每一个事件到达服务器端不会立即被执行,而是全部进入一个队列中,串行化的通过文件分发器调用相应的处理器,比如连接事件就会调用连接处理器去新建一个和原有socket配对的socket。redis建立起来的连接一般是长连接,后续不会再产生性能影响。连接建立后,再发送其他事件,比如响应事件,再通过队列进入事件分发器调用响应处理器处理事件,进入redisCinet中,然后再进入输入缓冲区找到对应的redisCommnd解析器中解析指令,进入内存空间获取到值再通过输出缓冲区返回给redisCinet,再通过socket返回给客户端服务器。
  • redis并发高的原因
    • Redis是纯内存数据库,一般都是简单的存取操作,线程占用的时间很多,时间的花费主要集中在IO上,所以读取速度快。
    • 再说一下IO,Redis使用的是非阻塞IO,IO多路复用,使用了单线程来轮询描述符,将数据库的开、关、读、写都转换成了事件,减少了线程切换时上下文的切换和竞争。( 多路”指的是多个网络连接,“复用”指的是复用同一个线程。采用多路 I/O 复用技术可以让单个线程高效的处理多个连接请求(尽量减少网络IO的时间消耗),且Redis在内存中操作数据的速度非常快(内存内的操作不会成为这里的性能瓶颈),主要以上两点造就了Redis具有很高的吞吐量。)
    • Redis采用了单线程的模型,保证了每个操作的原子性,也减少了线程的上下文切换和竞争。
    • 另外,数据结构也帮了不少忙,Redis全程使用hash结构,读取速度快,还有一些特殊的数据结构,对数据存储进行了优化,如压缩表,对短数据进行压缩存储,再如,跳表,使用有序的数据结构加快读取的速度。
    • 还有一点,Redis采用自己实现的事件分离器,效率比较高,内部采用非阻塞的执行方式,吞吐能力比较大。
  • redis执行慢和使用多线程的场景

    • 大key,大value的时候。

      redis代码编写

  • 工具代码 ```java public class RedisLock { RedissonClient redissonClient; public RedisLock(RedissonClient redissonClient) {

    1. this.redissonClient = redissonClient;

    }

    /**

    • 互斥锁,seconds秒后自动失效
    • @param key
    • @param seconds */ public boolean lock(String key, int seconds) { // 获得我们的锁对象 RLock rLock = redissonClient.getLock(key);

      if (rLock.isLocked()) {

      1. //如果有人正在持有这把锁,则直接失败
      2. return false;

      } //大量线程只有一部分可以到达这里进行自旋 rLock.lock(seconds, TimeUnit.SECONDS); return true; }

      /**

    • 互斥锁,自动续期 *
    • @param key */ public boolean lock(String key) {

      //如果说有人持有锁,其他人来执行,这个时候,会直接返回false //如果说真的有大量的请求的 同一时刻就真的完完全全的同一时刻 RLock rLock = redissonClient.getLock(key); //表示当前这把锁是否被人持有着 if (rLock.isLocked()) {

      1. return false;

      } //才是我们之前用的那个自旋的锁方法 // 大量线程中,可能会有少量的线程来同时走到这个方法 rLock.lock(); return true; }

  1. public boolean tryLock(String key, long timeout) throws InterruptedException {
  2. RLock rLock = redissonClient.getLock(key);
  3. return rLock.tryLock(timeout, TimeUnit.MILLISECONDS);
  4. }
  5. /**
  6. * 手动释放锁
  7. *
  8. * @param key
  9. */
  10. public void unlock(String key) {
  11. RLock rLock = redissonClient.getLock(key);
  12. if (rLock.isLocked()) {
  13. rLock.unlock();
  14. }
  15. }

}

  1. ```java
  2. @Component
  3. public class RedisCache {
  4. private RedisTemplate redisTemplate;
  5. public RedisCache(RedisTemplate redisTemplate) {
  6. this.redisTemplate = redisTemplate;
  7. }
  8. /**
  9. * 缓存存储
  10. *
  11. * @param key
  12. * @param value
  13. * @param seconds
  14. * @return void
  15. * @author xc
  16. */
  17. public void set(String key, String value, int seconds){
  18. ValueOperations<String,String> vo = redisTemplate.opsForValue();
  19. if(seconds > 0){
  20. vo.set(key, value, seconds, TimeUnit.SECONDS);
  21. }else{
  22. vo.set(key, value);
  23. }
  24. }
  25. /**
  26. * 缓存获取
  27. *
  28. * @param key
  29. * @return java.lang.String
  30. * @author xc
  31. */
  32. public String get(String key){
  33. ValueOperations<String,String> vo = redisTemplate.opsForValue();
  34. return vo.get(key);
  35. }
  36. /**
  37. * 缓存手动失效
  38. *
  39. * @param key
  40. * @return boolean
  41. * @author xc
  42. */
  43. public boolean delete(String key){
  44. return redisTemplate.delete(key);
  45. }
  46. /**
  47. * 缓存存储并设置过期时间
  48. *
  49. * @param key
  50. * @param value
  51. * @param time
  52. * @return void
  53. * @author xc
  54. */
  55. public void setex(String key, String value, long time) {
  56. redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
  57. }
  58. /**
  59. * 缓存批量获取
  60. *
  61. * @param keyList
  62. * @return java.util.List
  63. * @author xc
  64. */
  65. public List mget(List<String> keyList) {
  66. return redisTemplate.opsForValue().multiGet(keyList);
  67. }
  68. /**
  69. * 删除key下的多个值
  70. *
  71. * @param key
  72. * @param values
  73. * @return void
  74. * @author xc
  75. */
  76. public void srem(String key, String[] values) {
  77. redisTemplate.opsForSet().remove(key, values);
  78. }
  79. /**
  80. * 删除key下的多个值
  81. *
  82. * @param key
  83. * @param values
  84. * @return void
  85. * @author xc
  86. */
  87. public void sadd(String key, String[] values) {
  88. redisTemplate.opsForSet().add(key, values);
  89. }
  90. /**
  91. * 缓存成员获取
  92. *
  93. * @param key
  94. * @return java.util.Set
  95. * @author xc
  96. */
  97. public Set smembers(String key) {
  98. return redisTemplate.opsForSet().members(key);
  99. }
  100. /**
  101. * 缓存成员是否存在
  102. *
  103. * @param key
  104. * @param member
  105. * @return java.lang.Boolean
  106. * @author xc
  107. */
  108. public Boolean sismember(String key, String member) {
  109. return redisTemplate.opsForSet().isMember(key, member);
  110. }
  111. /**
  112. * 缓存有序区间值
  113. * @param key
  114. * @param min
  115. * @param max
  116. * @param offset
  117. * @param count
  118. * @return java.util.Set
  119. * @author xc
  120. */
  121. public Set zrangeByScore(String key, double min, double max, long offset, long count) {
  122. return redisTemplate.opsForZSet().rangeByScore(key, min, max, offset, count);
  123. }
  124. /**
  125. * 缓存有序区间值
  126. *
  127. * @param key
  128. * @param min
  129. * @param max
  130. * @return java.util.Set
  131. * @author xc
  132. */
  133. public Set zrangeByScore2(String key, double min, double max) {
  134. return redisTemplate.opsForZSet().rangeByScore(key, min, max);
  135. }
  136. /**
  137. * 倒序返回zset区间值
  138. * @param key
  139. * @param start
  140. * @param end
  141. * @return
  142. */
  143. public Set zrevrange(String key, long start, long end) {
  144. return redisTemplate.opsForZSet().reverseRange(key, start, end);
  145. }
  146. /**
  147. * 缓存倒序排列指定区间值
  148. *
  149. * @param key
  150. * @param min
  151. * @param max
  152. * @param offset
  153. * @param count
  154. * @return java.util.Set
  155. * @author xc
  156. */
  157. public Set zrevrangeByScore(String key, double min, double max, long offset, long count) {
  158. return redisTemplate.opsForZSet().reverseRangeByScore(key, min, max, offset, count);
  159. }
  160. /**
  161. * 缓存有序存储
  162. *
  163. * @param key
  164. * @param member
  165. * @param score
  166. * @return java.lang.Boolean
  167. * @author xc
  168. */
  169. public Boolean zadd(String key, String member, double score) {
  170. return redisTemplate.opsForZSet().add(key, member, score);
  171. }
  172. /**
  173. * 缓存有序存储
  174. *
  175. * @param key
  176. * @param values
  177. * @return java.lang.Long
  178. * @author xc
  179. */
  180. public Long zremove(String key, String... values) {
  181. return redisTemplate.opsForZSet().remove(key, values);
  182. }
  183. /**
  184. * 缓存有序数量
  185. *
  186. * @param key
  187. * @return java.lang.Long
  188. * @author xc
  189. */
  190. public Long zcard(String key) {
  191. return redisTemplate.opsForZSet().zCard(key);
  192. }
  193. /**
  194. * 判断hash key是否存在
  195. *
  196. * @param key
  197. * @return
  198. */
  199. public boolean hExists(String key) {
  200. return hGetAll(key).isEmpty();
  201. }
  202. /**
  203. * 判断hash field是否存在
  204. * @return
  205. */
  206. public boolean hFieldExists(String key, String field) {
  207. return redisTemplate.opsForHash().hasKey(key, field);
  208. }
  209. /**
  210. * 获取hash变量中的键值对
  211. * 对应redis hgetall 命令
  212. *
  213. * @param key
  214. * @return
  215. */
  216. public Map<String, String> hGetAll(String key) {
  217. return redisTemplate.opsForHash().entries(key);
  218. }
  219. /**
  220. * 获取hash变量中的
  221. * @return
  222. */
  223. public Object hGet(String key, String field) {
  224. return redisTemplate.opsForHash().get(key, field);
  225. }
  226. /**
  227. * hash变量中field对应的value自增
  228. * @param key
  229. * @param field
  230. * @param increment
  231. * @return
  232. */
  233. public Long hIncr(String key, String field, Integer increment) {
  234. return redisTemplate.opsForHash().increment(key, field, increment);
  235. }
  236. /**
  237. * hash变量field对应的value自增
  238. * @param key
  239. * @param field
  240. * @return
  241. */
  242. public Long hIncr(String key, String field) {
  243. return hIncr(key, field, 1);
  244. }
  245. /**
  246. * 获取hash变量中的field数量
  247. * 对应redis hlen 命令
  248. *
  249. * @param key
  250. * @return
  251. */
  252. public Long hLen(String key) {
  253. return redisTemplate.opsForHash().size(key);
  254. }
  255. /**
  256. * 添加hash的value
  257. * @param key
  258. * @param field
  259. * @param value
  260. */
  261. public void hPut(String key, String field, Object value) {
  262. redisTemplate.opsForHash().put(key, field, value);
  263. }
  264. /**
  265. * 以map集合的形式添加hash键值对
  266. *
  267. * @param key
  268. * @param map
  269. */
  270. public void hPutAll(String key, Map<String, String> map) {
  271. redisTemplate.opsForHash().putAll(key, map);
  272. }
  273. /**
  274. * 删除某个field
  275. * @param key
  276. * @param field
  277. */
  278. public long hDel(String key, String field) {
  279. return redisTemplate.opsForHash().delete(key, field);
  280. }
  281. /**
  282. * 设置key过期时间
  283. * @param key
  284. * @param time
  285. * @param unit
  286. */
  287. public void expire(String key, long time, TimeUnit unit) {
  288. redisTemplate.expire(key, time, unit);
  289. }
  290. /**
  291. * 以list集合的形式添加数据
  292. *
  293. * @param key
  294. * @return
  295. */
  296. public Boolean hasKey(String key) {
  297. return redisTemplate.hasKey(key);
  298. }
  299. /**
  300. * 以list集合的形式添加数据
  301. *
  302. * @param key
  303. * @param values
  304. * @return
  305. */
  306. public Long lPushAll(String key, String... values) {
  307. return redisTemplate.opsForList().leftPushAll(key, values);
  308. }
  309. /**
  310. * 以list集合的形式添加数据
  311. *
  312. * @param key
  313. * @param values
  314. * @return
  315. */
  316. public Long lPushAll(String key, List<String> values) {
  317. return redisTemplate.opsForList().leftPushAll(key, values);
  318. }
  319. /**
  320. * 以list集合的形式添加数据
  321. *
  322. * @param key
  323. * @param values
  324. * @return
  325. */
  326. public Long rPushAll(String key, String... values) {
  327. return redisTemplate.opsForList().rightPushAll(key, values);
  328. }
  329. /**
  330. * 以list集合的形式添加数据
  331. *
  332. * @param key
  333. * @param values
  334. * @return
  335. */
  336. public Long rPushAll(String key, List<String> values) {
  337. return redisTemplate.opsForList().rightPushAll(key, values);
  338. }
  339. /**
  340. * 返回list集合下表区间的元素
  341. *
  342. * @param key
  343. * @param start
  344. * @param end
  345. * @return
  346. */
  347. public List<String> lRange(String key, long start, long end) {
  348. return redisTemplate.opsForList().range(key, start, end);
  349. }
  350. /**
  351. * 返回list集合的大小
  352. *
  353. * @param key
  354. * @return
  355. */
  356. public Long lsize(String key) {
  357. return redisTemplate.opsForList().size(key);
  358. }
  359. /**
  360. * 设置缓存过期时间
  361. *
  362. * @param key
  363. * @return
  364. */
  365. public void expire(String key, long time) {
  366. this.expire(key, time, TimeUnit.SECONDS);
  367. }
  368. /**
  369. * 执行lua脚本
  370. *
  371. * @param script
  372. * @param keys
  373. * @param args
  374. * @param <T>
  375. * @return
  376. */
  377. public <T> T execute(RedisScript<T> script, List<String> keys, String... args) {
  378. return (T) redisTemplate.execute(script, keys, args);
  379. }
  380. /**
  381. * 缓存存储并设置过期时间
  382. *
  383. * @param key
  384. * @param value
  385. * @param time
  386. * @param timeUnit
  387. * @return void
  388. */
  389. public void setex(String key, String value, long time, TimeUnit timeUnit) {
  390. redisTemplate.opsForValue().set(key, value, time, timeUnit);
  391. }
  392. /**
  393. * 缓存增量
  394. *
  395. * @param key
  396. * @param increment
  397. * @return java.lang.Long
  398. */
  399. public Long increment(String key, long increment) {
  400. return redisTemplate.opsForValue().increment(key, increment);
  401. }
  402. /**
  403. * 缓存失效时间
  404. *
  405. * @param key
  406. * @param timeUnit
  407. * @return java.lang.Long
  408. */
  409. public Long getExpire(String key, TimeUnit timeUnit) {
  410. return redisTemplate.getExpire(key, timeUnit);
  411. }
  412. /**
  413. * 指定缓存失效时间
  414. *
  415. * @param key
  416. * @param date
  417. * @return java.lang.Boolean
  418. */
  419. public Boolean expireAt(String key, Date date) {
  420. return redisTemplate.expireAt(key, date);
  421. }
  422. }
  1. /**
  2. * redis key 常量
  3. * @author xc
  4. */
  5. public class RedisKeyConstants {
  6. /**
  7. * 用户信息锁修改锁前缀
  8. */
  9. public static final String USER_UPDATE_LOCK_PREFIX = "user_update_lock:";
  10. /**
  11. * 用户锁信息锁前缀
  12. */
  13. public static final String USER_LOCK_PREFIX = "user_info_lock:";
  14. /**
  15. * 用户详情前缀
  16. */
  17. public static final String USER_INFO_PREFIX = "user_info_prefix:";
  18. }
  • 初始代码 ```java / 根据id查询 / @Override public ApUserRealname findById(Integer apUserId) {
    1. String userInfoKey = RedisKeyConstants.USER_INFO_PREFIX + apUserId;
    2. String jsonUser = stringRedisTemplate.opsForValue().get(userInfoKey);
    3. if(jsonUser == null){
    4. ApUserRealname apUserRealname = apRealMapper.selectById(apUserId);
    5. String redisJson = JSON.toJSONString(apUserRealname);
    6. //将数据放入到redis中
    7. stringRedisTemplate.opsForValue().set(userInfoKey,redisJson);
    8. //返回从数据库中查询到的数据
    9. return apUserRealname;
    10. }
    11. ApUserRealname apUserRealname = JsonUtil.json2Object(jsonUser, ApUserRealname.class);
    12. return apUserRealname;
    } / 更新修改 / @Override public void update(ApUserRealname apUserRealname) {
    1. apRealMapper.updateById(apUserRealname);
    }
  1. - 优化一:避免大量key同一时间过期,缓存雪崩问题(惊群问题)
  2. ```java
  3. @Override
  4. public void update(ApUserRealname apUserRealname) {
  5. String userInfoKey = RedisKeyConstants.USER_INFO_PREFIX + apUserRealname.getUserId();
  6. apRealMapper.updateById(apUserRealname);
  7. //当把数据进行修改之后,我们需要把数据保存到redis里边去
  8. // 为了防止缓存雪崩,也就是缓存的集群效应,我们需要使用固定时间 + 随机时间
  9. redisCache.set(userInfoKey,JsonUtil.object2Json(apUserRealname),CacheSupport.generateCacheExpireSecond());
  10. }
  • 优化二:防止幂等性问题(使用优化过后的自旋锁,企业级开发几乎不会使用无脑的自旋锁,因为性能)

    1. @Override
    2. public void update(ApUserRealname apUserRealname) {
    3. //如果出现了网络卡顿,可能会导致一个请求,就可能导致一个请求发起了多次,导致这个操作进行了多次,修改是这个样子,保存其实也是这个样子
    4. String redisLockKey = RedisKeyConstants.USER_INFO_PREFIX + apUserRealname.getUserId();
    5. if(!redisLock.lock(redisLockKey)){
    6. //如果是false,就是触发幂等操作了
    7. throw new RuntimeException("请不要幂等操作");
    8. }
    9. try{
    10. redisCache.set(userInfoKey,JsonUtil.object2Json(apUserRealname),CacheSupport.generateCacheExpireSecond());
    11. }finally {
    12. //释放锁
    13. redisLock.unlock(userInfoKey);
    14. }
    15. }
  • 优化三:解决缓存穿透问题(并且设置了冷热数据过期,就是冷数据过期,热数据续期 )

    1. @Override
    2. public ApUserRealname findById(Integer apUserId) {
    3. String userInfoKey = RedisKeyConstants.USER_INFO_PREFIX + apUserId;
    4. //有没有一种可能性,这个代码是一个缓存穿透代码
    5. String jsonUser = stringRedisTemplate.opsForValue().get(userInfoKey);
    6. if (StringUtils.hasLength(jsonUser)) // 有对象 {}
    7. //说明是有内容的,但是这个内容可能是一个穿透代码
    8. if(Objects.equals(CacheSupport.EMPTY_CACHE,jsonUser)){
    9. //说明是一个穿透代码,直接给他返回一个空对象即可
    10. return new ApUserRealname();
    11. }
    12. //走到这里说明他并非是一个空对象,是确实有数据,那么我应该给当前这个数据进行续期
    13. redisCache.expire(userInfoKey,CacheSupport.generateCacheExpireSecond());
    14. //把他转换成json返回
    15. return JsonUtil.json2Object(jsonUser,ApUserRealname.class);
    16. }
    17. //说明redis中没有数据
    18. ApUserRealname apUserRealname = apRealMapper.selectById(apUserId);
    19. if(apUserRealname == null){
    20. //如果说数据库是个空,那么需要和之前的数据对应起来
    21. redisCache.set(userInfoKey,CacheSupport.EMPTY_CACHE,CacheSupport.generateCacheExpireSecond());
    22. return new ApUserRealname();
    23. }
    24. //走到这儿就说明 redis中没有,db里边有,然后再设置一个过期时间
    25. redisCache.set(userInfoKey,JsonUtil.object2Json(apUserRealname),CacheSupport.generateCacheExpireSecond());
    26. return apUserRealname;
    27. }
  • 优化四:抽取方法 ```java @Override public ApUserRealname findById(Integer apUserId) {

    String userInfoKey = RedisKeyConstants.USER_INFO_PREFIX + apUserId; ApUserRealname apUserRealname = readDataFromRedis(userInfoKey); if(!Objects.isNull(apUserRealname)){

    1. // redis中存在
    2. return apUserRealname;

    }

    return getDataFromDb(userInfoKey,apUserId); }

private ApUserRealname getDataFromDb(String userInfoKey ,Integer apUserId) {

  1. ApUserRealname apUserRealname = apRealMapper.selectById(apUserId);
  2. if(apUserRealname == null){
  3. //如果说数据库是个空,那么需要和之前的数据对应起来
  4. redisCache.set(userInfoKey,CacheSupport.EMPTY_CACHE,CacheSupport.generateCacheExpireSecond());
  5. return new ApUserRealname();
  6. }
  7. //走到这儿就说明 redis中没有,db里边有,然后再设置一个过期时间
  8. redisCache.set(userInfoKey,JsonUtil.object2Json(apUserRealname),CacheSupport.generateCacheExpireSecond());
  9. return apUserRealname;

}

  1. - 优化 五:解决缓存击穿问题
  2. ```java
  3. private ApUserRealname getDataFromDb(String userInfoKey ,Integer apUserId) {
  4. //要避免缓存击穿代码,热点key过期,大量线程涌入,直击数据库,添加分布式锁,但是不能添加自旋锁,避免一次自旋次数太多。
  5. String redisLockKey = RedisKeyConstants.USER_LOCK_PREFIX + apUserId;
  6. boolean lock = redisLock.lock(redisLockKey);
  7. if(!lock){
  8. //只有少部分可以进入到自旋中, 大部分人直接失败
  9. throw new RuntimeException("请稍后再查");
  10. }
  11. try{
  12. ApUserRealname apUserRealname = apRealMapper.selectById(apUserId);
  13. if(apUserRealname == null){
  14. //如果说数据库是个空,那么需要和之前的数据对应起来
  15. redisCache.set(userInfoKey,CacheSupport.EMPTY_CACHE,CacheSupport.generateCacheExpireSecond());
  16. return new ApUserRealname();
  17. }
  18. //走到这儿就说明 redis中没有,db里边有,然后再设置一个过期时间
  19. redisCache.set(userInfoKey,JsonUtil.object2Json(apUserRealname),CacheSupport.generateCacheExpireSecond());
  20. return apUserRealname;
  21. }finally {
  22. redisLock.unlock(userInfoKey);
  23. }
  24. }
  • 优化六:解决双写一致性问题,更新与查询用同一把锁
  • 优化七:doublecheck + tryLock 进行数据优化
    1. private ApUserRealname getDataFromDb(String userInfoKey, Integer apUserId) {
    2. //要避免缓存击穿代码,热点key过期,大量线程涌入,直击数据库,添加分布式锁,但是不能添加自旋锁,避免一次自旋次数太多。
    3. String redisLockKey = RedisKeyConstants.USER_LOCK_PREFIX + apUserId;
    4. boolean lock = false;
    5. try {
    6. //大家都尝试去抢锁,但是只能有一个人抢锁成功,如果在规定的时间之内,没有抢锁成功,那么就就会直接返回false
    7. //在1s的时间内,其实当前这个方法串行化是完全可以执行成功的,因为毕竟此时数据库的压力不大,
    8. lock = redisLock.tryLock(redisLockKey, 1);
    9. } catch (InterruptedException e) {
    10. ApUserRealname apUserRealname = readDataFromRedis(userInfoKey);
    11. if (apUserRealname != null) {
    12. return apUserRealname;
    13. }
    14. e.printStackTrace();
    15. }
    16. if (!lock) {
    17. //只有少部分可以进入到自旋中, 大部分人直接失败\
    18. ApUserRealname apUserRealname = readDataFromRedis(userInfoKey);
    19. if (apUserRealname != null) {
    20. return apUserRealname;
    21. }
    22. throw new RuntimeException("请稍后再查");
    23. }
    24. try {
    25. ApUserRealname redisData = readDataFromRedis(userInfoKey);
    26. if(redisData != null){
    27. return redisData;
    28. }
    29. //线程a,他就去访问这个数据库,他读取到了一个旧数据,然后这个时候,有人去修改了数据
    30. ApUserRealname apUserRealname = apRealMapper.selectById(apUserId);
    31. if (apUserRealname == null) {
    32. //如果说数据库是个空,那么需要和之前的数据对应起来
    33. redisCache.set(userInfoKey, CacheSupport.EMPTY_CACHE, CacheSupport.generateCacheExpireSecond());
    34. return new ApUserRealname();
    35. }
    36. //走到这儿就说明 redis中没有,db里边有,然后再设置一个过期时间、
    37. // 此时下边这个线程反应过来了,他接着向缓存中写入数据,旧的数据就覆盖了新的数据,这就是db+缓存双写问题
    38. redisCache.set(userInfoKey, JsonUtil.object2Json(apUserRealname), CacheSupport.generateCacheExpireSecond());
    39. return apUserRealname;
    40. } finally {
    41. redisLock.unlock(userInfoKey);
    42. }