为了系统性能的提升,我们一般都会将部分数据放入缓存中,加速访问。而DB承担数据落盘工作(做持久化)。

那么哪些数据适合放入缓存呢?

即时性、数据一致性要求不高的

电商类应用,商品分类,商品列表等数据加入缓存,并设置失效时间为5分钟(一般根据数据更新频率来定),后台发布新的商品,买家可能需要5分钟之后才能看到新商品

访问量大且更新频率不高的数据(读多,写少)

电商类应用,商品参数等数据,用户会经常查询,但是录入之后,一般不会进行修改

读模式缓存一般的使用流程如下图:

3. 缓存&分布式锁[151-172] - 图1

1. 本地缓存与分布式缓存

1.1 本地缓存

假设我们在本地使用Map进行缓存工作,缓存和我们的应用是部署在同一台服务器的。

  1. data = cache.load(id);// 从缓存加载数据
  2. if(data == null){
  3. data = db.load(id);// 从数据库加载数据
  4. cache.put(id,data);// 保存到 cache 中
  5. }
  6. return data;

在开发中,凡是放入缓存中的数据我们都应该指定过期时间,使其可以在系统即使没有主动更新数据也能自动触发数据加载进缓存的流程。避免业务崩溃导致的数据永久不一致问题。

这种在本地完成缓存的方式,有以下问题:

  1. 这种方式没办法缓存大量的数据
  2. 如果是单体应用不会有问题,如果是分布式,这种方式就会有问题。

3. 缓存&分布式锁[151-172] - 图2

第一个问题:假设第一次请求,负载均衡到了A服务器,A服务器的本地缓存中没有数据,查询数据库之后将数据缓存在了A服务器的本地缓存中,第二次负载均衡到的是B服务器,B服务器的本地缓存中没有数据,那么还是得再去查询数据库。

第二个问题:如果第一次负载均衡到了A服务器,我们将A服务器上的数据修改了,并且修改了A服务器本地缓存中的数据,第二次负载均衡到了B服务器,B服务器缓存中的数据是没有修改的,那么我们取到的数据就不是修改后的数据,会造成数据不一致的问题。

1.2 分布式缓存

使用缓存中间件,大家共享同一个缓存来完成缓存工作,这样就不会有数据一致性问题,而且如果我们一台redis不能满足要求,或者性能不足,我们也可以把redis做集群。

3. 缓存&分布式锁[151-172] - 图3

2. 整合Redis

SpringBoot已经将Redis抽取为了一个场景启动器,所以我们在SpringBoot项目中整合Redis的步骤是:

  1. 首先服务器上要安装有redis

  2. 引入redis-starter的依赖

    1. <dependency>
    2. <groupId>org.springframework.boot</groupId>
    3. <artifactId>spring-boot-starter-data-redis</artifactId>
    4. </dependency>
  3. 配置redis

    spring:
     redis:
     host: 192.168.56.10
     port: 6379
    
  4. 使用 RedisTemplate 操作 redis

我们往缓存中放数据的时候,往往是放Json格式,因为Json格式可以跨平台,不论是PHP还是Java都可以解析Json格式的数据,为此,Spring专门封装了一个操作StringRedisTemplate来操作redis。

// 给缓存中放json字符串,拿出json字符串并逆转为可用的对象类型
public List<CategoryEntity> getCategoryJson() {

    // 1. 加入缓存逻辑,缓存中存的对象时Json字符串
    String jsonObj = redisTemplate.opsForValue().get("categoryJSON");

    if(StringUtils.isEmpty(jsonObj)){

        // 2. 缓存中没有,就去查数据库
        List<CategoryEntity> categoryEntities = listWithTree();

        // 3. 从数据库查出来的数据再转成Json放进缓存中
        String jsonString = JSON.toJSONString(categoryEntities);
        redisTemplate.opsForValue().set("categoryJSON",jsonString);

        return categoryEntities;
    }

    // 4. 转换为我们指定的对象
    List<CategoryEntity> categoryEntities = JSON.parseObject(jsonObj, new TypeReference<List<CategoryEntity>>() {});
    return categoryEntities;
}

上述方式一定会产生下图的异常:

3. 缓存&分布式锁[151-172] - 图4

异常产生原因:

SpringBoot2.0以后默认使用lettuce作为操作redis的客户端,lettuce使用netty进行网络通信,所以是lettuce的bug导致netty堆外内存溢出,因为内存没有得到及时的释放。netty如果没有指定对外内存,默认使用-Xmx指定的内存。理论上,只要我们-Xmx设置的足够大,可以延缓这个问题的出现,但是不能彻底解决,时间一长,还是会出现异常

解决方案:

  1. 升级lettuce客户端(目前没有解决这个异常的)
  2. 切换使用jedis客户端,去操作redis

我们使用第二种方案:

<!--排除lettuce,引入jedis-->
<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
      <exclusions>
        <exclusion>
          <groupId>io.lettuce</groupId>
          <artifactId>lettuce-core</artifactId>
        </exclusion>
      </exclusions>
</dependency>

<dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
</dependency>

lettuce和jedis都是操作redis的底层客户端,RedisTemplate 是Spring对lettuce和jedis进行了再次封装

3. 缓存失效问题

3.1 缓存穿透

缓存穿透是指查询一个一定不存在的数据,由于缓存是不命中,将去查询数据库,但是数据库也无此记录,我们没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。

风险:
在流量大时,可能DB就挂掉了,要是有人利用不存在的key频繁攻击我们的应用,数据库瞬时压力增大,这就是漏洞。

解决:
缓存空结果、并且设置短的过期时间。

3.2 缓存雪崩

缓存雪崩是指在我们设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。

解决:
原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

3.3 缓存击穿

对于一些设置了过期时间的 key,如果这些key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。

这个时候,需要考虑一个问题:如果这个key在大量请求同时进来前正好失效,那么所有对这个key的数据查询都落到db,我们称为缓存击穿。

解决:
加锁,大量并发只让一个去查,其他人等待,查到以后释放锁,其他人获取到锁,先查缓存,就会有数据,不用去db

3.3.1 本地锁

为了解决缓存击穿,我们引入了锁,那么要如何加锁呢?

// 给缓存中放json字符串,拿出json字符串并逆转为可用的对象类型
public List<CategoryEntity> getCategoryJson() {

    // 1. 加入缓存逻辑,缓存中存的对象时Json字符串
    String jsonObj = redisTemplate.opsForValue().get("categoryJSON");

    if(StringUtils.isEmpty(jsonObj)){

        // 2. 缓存中没有,就去查数据库
        List<CategoryEntity> categoryEntities = listWithTree();
        return categoryEntities;
    }

    // 4. 转换为我们指定的对象
    List<CategoryEntity> categoryEntities = JSON.parseObject(jsonObj, new TypeReference<List<CategoryEntity>>() {});
    return categoryEntities;
}

public List<CategoryEntity> listWithTree() {

    // 加锁
    synchronized (this){
        // 1、查出所有分类
        List<CategoryEntity> entities = baseMapper.selectList(null);

        // 2. 从数据库查出来的数据再转成Json放进缓存中
        String jsonString = JSON.toJSONString(categoryEntities);
        redisTemplate.opsForValue().set("categoryJSON",jsonString);

        return level1Menus;
    }
}

为什么要把“从数据库查出来的数据再转成Json放进缓存中”这一步放到加锁的逻辑中?

3. 缓存&分布式锁[151-172] - 图5

因为把结果放入缓存,需要进行网络传输,需要时间,假设我们把这一步放在加锁的逻辑之外,在把结果放入缓存时,锁已经被释放了,如果在这个过程中进来新的查询,会发现缓存中还是没有数据,会再次去查询数据库。将这一步放到加锁的逻辑中,把结果放入缓存之后,再释放锁,下一次查询,缓存中一定有数据

本地锁为什么不适用于分布式系统?

3. 缓存&分布式锁[151-172] - 图6

因为本地锁只能锁住当前进程,就算我们锁住了自己服务器的进程,保证一台服务器只有一个进程进行查询,那么假设我们有10台服务器,也会有10个进程去查询数据库。

3.3.2 分布式锁

分布式锁的原理如下图所示:我们可以去同一个地方占锁,如果占到锁,就可以执行逻辑,否则就必须等待,只到释放锁。

占锁可以在redis,数据库,或者任意大家能访问的地方。

3. 缓存&分布式锁[151-172] - 图7

3.3.2.1 使用redis占锁

  1. 分布式锁演进—阶段一

    public List<CategoryEntity> getCategoryJsonRedisLock() throws InterruptedException {
    
     // 1. 占锁/setIfAbsent=redis的SETNX:可以占坑返回true,否则返回false
     Boolean jsonObj = redisTemplate.opsForValue().setIfAbsent("lock","111");
     if(jsonObj){
         // 2. 执行业务逻辑
         List<CategoryEntity> categoryEntities = listWithTree();
         // 3. 业务逻辑结束之后,坑位释放
         redisTemplate.delete("lock");
         return categoryEntities;
     } else {
         // 没占上坑,休眠100ms,再试试
         Thread.sleep(100);
         return getCategoryJsonRedisLock();
     }
    }
    

    3. 缓存&分布式锁[151-172] - 图8

存在问题:setnx占好了位,业务代码异常或者程序在页面过程中宕机。没有执行删除锁逻辑,这就造成了死锁

解决方案:设置锁的自动过期,即使没有删除,会自动删除

  1. 分布式锁演进—阶段二

    public List<CategoryEntity> getCategoryJsonRedisLock() throws InterruptedException {
    
     // 1. 占锁/setIfAbsent=redis的SETNX:可以占坑返回true,否则返回false
     Boolean jsonObj = redisTemplate.opsForValue().setIfAbsent("lock","111");
     if(jsonObj){
         // 设置锁的自动过期时间
         redisTemplate.expire("lock",3000, TimeUnit.SECONDS);
         // 2. 执行业务逻辑
         List<CategoryEntity> categoryEntities = listWithTree();
         // 3. 业务逻辑结束之后,坑位释放
         redisTemplate.delete("lock");
         return categoryEntities;
     } else {
         // 没占上坑,休眠100ms,再试试
         Thread.sleep(100);
         return getCategoryJsonRedisLock();
     }
    }
    

    3. 缓存&分布式锁[151-172] - 图9

存在问题:setnx设置好,正要去设置过期时间,宕机。又死锁了。

解决方案:设置过期时间和占位必须是原子的。redis支持使用setnx ex命令

  1. 分布式锁演进—阶段三

    public List<CategoryEntity> getCategoryJsonRedisLock() throws InterruptedException {
    
     // 1. 占锁/setIfAbsent=redis的SETNX:可以占坑返回true,否则返回false
     Boolean jsonObj = redisTemplate.opsForValue().setIfAbsent("lock","111",3000,TimeUnit.SECONDS);
     if(jsonObj){
         // 设置锁的自动过期时间
         // redisTemplate.expire("lock",3000, TimeUnit.SECONDS);
         // 2. 执行业务逻辑
         List<CategoryEntity> categoryEntities = listWithTree();
         // 3. 业务逻辑结束之后,坑位释放
         redisTemplate.delete("lock");
         return categoryEntities;
     } else {
         // 没占上坑,休眠100ms,再试试
         Thread.sleep(100);
         return getCategoryJsonRedisLock();
     }
    }
    

    3. 缓存&分布式锁[151-172] - 图10

存在问题:占锁的问题解决了,但是删除锁的时候还有问题,如果由于业务时间很长,锁自己过期了,我们直接删除,有可能把别人正在持有的锁删除了。

解决方案:占锁的时候,值指定为uuid,每个人匹配是自己的锁才删除

  1. 分布式锁演进—阶段四

    public List<CategoryEntity> getCategoryJsonRedisLock() throws InterruptedException {
    
     // 1. 占锁/setIfAbsent=redis的SETNX:可以占坑返回true,否则返回false
     String uuid = UUID.randomUUID().toString();
     Boolean jsonObj = redisTemplate.opsForValue().setIfAbsent("lock",uuid,3000,TimeUnit.SECONDS);
     if(jsonObj){
         // 设置锁的自动过期时间
         // redisTemplate.expire("lock",3000, TimeUnit.SECONDS);
         // 2. 执行业务逻辑
         List<CategoryEntity> categoryEntities = listWithTree();
         // 3. 业务逻辑结束之后,判断是不是自己的锁,坑位释放
         if(uuid.equals(redisTemplate.opsForValue().get("lock"))){
             redisTemplate.delete("lock");
         }
         return categoryEntities;
     } else {
         // 没占上坑,休眠100ms,再试试
         Thread.sleep(100);
         return getCategoryJsonRedisLock();
     }
    }
    

存在问题:如果判断的时候确实是当前自己的锁,刚判断完,自己的锁过期了,别人已经设置到了新的值,然后我们再去删除,那么删除的还是别人的锁

解决方案:删除锁必须保证原子性。使用redis+Lua脚本完成

  1. 分布式锁演进—阶段五

    public List<CategoryEntity> getCategoryJsonRedisLock() throws InterruptedException {
    
     // 1. 占锁/setIfAbsent=redis的SETNX:可以占坑返回true,否则返回false
     String uuid = UUID.randomUUID().toString();
     Boolean jsonObj = redisTemplate.opsForValue().setIfAbsent("lock",uuid,3000,TimeUnit.SECONDS);
     if(jsonObj){
         // 设置锁的自动过期时间
         // redisTemplate.expire("lock",3000, TimeUnit.SECONDS);
         List<CategoryEntity> categoryEntities = null;
         // 2. 执行业务逻辑
         try {
             categoryEntities = listWithTree();
         } finally {
             // 3. 业务逻辑结束之后,坑位释放
             String script = "if redis.call('get', KEYS[1]) == ARGV[1] then returnredis.call('del', KEYS[1]) else return 0 end";
             redisTemplate.execute(new DefaultRedisScript<Long>(script,Long.class),Arrays.asList("lock",uuid));
         }
    
         return categoryEntities;
     } else {
         // 没占上坑,休眠100ms,再试试
         Thread.sleep(100);
         return getCategoryJsonRedisLock();
     }
    }
    

保证加锁【占位+过期时间】和删除锁【判断+删除】的原子性,就可以了。

但是还有另外一个问题,如果我们在锁的过期时间之内,没有执行完我们的业务逻辑,要怎么给锁自动续期呢?原理上,只要我们给锁的过期时间设置的足够长,也是可以的。

3.3.2.2 Redisson

本地锁只能锁住当前服务,没法锁住分布式的所有服务。redis提供了一个专业的redis分布式锁框架 ——Redisson
【Resisson官方文档】

  1. 使用程序化配置方式创建RedissonClient

    @Configuration
    public class MyRedissonConfig {
    
     @Bean(destroyMethod = "shutdown")
     public RedissonClient redissonClient(){
         Config config = new Config();
         config.useSingleServer().setAddress("redis://192.168.56.10:6379");
         RedissonClient redisson = Redisson.create(config);
         return redisson;
     }
    }
    
  2. 测试Redisson加锁功能

    public void hello(@RequestParam Map<String, Object> params){
    
     // 获取一把锁,只要锁的名字一样,就是同一把锁
     RLock lock = redissonClient.getLock("my-lock");
    
     // 加锁
     lock.lock();
     try {
         System.out.println("hello...........");
     } finally {
         // 解锁
         lock.unlock();
     }
    }
    

lock.lock()是一个阻塞式等待,如果拿不到锁,会一直等,只到拿到锁才会调用业务逻辑。意思就是假设有连个线程,线程A拿到锁,执行了业务逻辑方法,线程A没有释放锁之前,线程B拿不到锁,是不会去执行业务逻辑方法的,如果执行了业务逻辑方法,也意味着拿到了锁

3. 缓存&分布式锁[151-172] - 图11

假设我们在加锁之后,解锁之前宕机,也不会有死锁问题。默认加的锁都是30s时间。

锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s。不用担心业务时间长,锁自动过期被删除
加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除。

源码:
/
lock.Lock(10,TimeUnit.SECONDS);//10秒自动解锁,自动解锁时间一定要大于业务的执行时间。//问题:Lock.Lock(10,TimeUnit.SECONDs );I在锁时间到了以后,不会自动续期。
//1、如果我们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间//2、如果我们未指定锁的超时时间,就使用30 * 10oe【LockwatchdogTimeout看门狗的默认时间】;
/只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】 /internallockLeaseTime【看门狗时间】/ 3,10s

/ /
lock.Lock(10,TimeUnit.SECONDS);//10秒自动解锁,自动解锁时间一定要大于业务的执行时间。//问题:Lock.Lock(10,TimeUnit.SECONDs );I在锁时间到了以后,不会自动续期。
//1、如果我们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间//2、如果我们未指定锁的超时时间,就使用30 * 10oe【LockwatchdogTimeout看门狗的默认时间】;
/只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】 /internallockLeaseTime【看门狗时间】/ 3,10s