缓存

概述

为了系统性能的提升,我们一般都会将部分数据放入缓存中,加速访问。而 db 承担数据落盘工作(即:持久化工作)。
哪些数据适合使用缓存:
即时性、数据一致性要求不高的 ;
访问量大且更新频率不高的数据(读多,写少);
举例:电商类应用,商品分类,商品列表等适合缓存并加一个失效时间(根据数据更新频率定),后台如果发布一个商品,买家需要 5 分钟才能看到新的商品一般还是可以接受的。
image.png
注意: 在开发中,凡是放入缓存中的数据我们都应该指定过期时间,使其可以在系统即使没有主动更新数据也能自动触发数据加载进缓存的流程。避免业务崩溃导致的数据永久不一致问题。

本地缓存

什么是本地缓存?
缓存的组件,与我们的代码是属于同一个进程的。他们运行在同一个项目里面,在同一个jvm中,只相当于在本地保存一个副本。
比如,使用Map集合保存数据,就是一种本地缓存。
这种方式,
如果我们的应用只是一个单体应用,只部署在一台服务器,那么什么问题都没有,而且很快;但是,在分布式系统下,若应用部署在多台服务器,那么每台服务器都自带自己的本地缓存,这样的话,可能会出现问题:
1,访问到不同的服务器时,缓存中是否有数据的情况不一样。可能A服务器缓存有数据,直接读取,但是下次访问到B服务器,缓存没数据,就得再次查询数据库,存入缓存,才能读取数据;若再一次访问到C服务器,C服务器的缓存也没数据,那么也得再次查询数据;
2,会造成缓存中数据的不一致。如果对数据进行了修改,为了能够读取到正确的数据,就得修改缓存中的数据,假如修改请求负载均衡来到了A服务器,A中的缓存数据一并修改掉了,但是B、C及其他的多台服务器是没有收到修改请求的,缓存中的数据没有修改,因此造成各个缓存中的数据不一致。
基于此,我们应当使用分布式缓存。

分布式缓存

不应该把缓存放到本地的一个进程中,而是各个分布式的多台服务器,共享一个集中式的缓存中间件,比如redis。
除开能够避免本地缓存的问题外,分布式缓存还可以在假如一台redis容量不够或性能不足的情况下,进行集群,也可以分片存储,打破了本地缓存的容量限制。

整合Redis

安装好redis以后,来整合redis。
以gulimall-product为例:
1.在项目的pom文件中,引入redis的springboot起步依赖:

  1. <!-- 引入redis-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-data-redis</artifactId>
  5. </dependency>

2.然后在配置文件中,配置redis相关信息:

spring:  
    redis:
      host: 127.0.0.1

这里配置类redis所在的端口号,一般这样就可以使用了,其他的还可以配置redis的端口号(不配置默认使用6379)、密码,默认使用的哪个库等等。
3.然后就可以使用SpringBoot自动配置好的StringRedisTemplate/ReidsTemplate来操作Redis:
ReidsTemplate对应Map,StringRedisTemplate对应Map,存放数据key,数据的值value。

测试使用:

 @Autowired
    StringRedisTemplate stringRedisTemplate;
    @Test
    public void testStringRedisTemplate() {
        ValueOperations<String, String> operations = stringRedisTemplate.opsForValue();
        // 保存
        operations.set("k1","v1_"+ UUID.randomUUID().toString());
        // 查询
        String value = operations.get("k1");
        System.out.println("k1对应的数据:"+value);
    }

使用缓存改造获取商品三级分类业务

简单改造,使用缓存逻辑,暂时没有考虑数据修改、设置缓存到期时间等特殊情况:

@Override
    public Map<String, List<Catelog2VO>> getCatalogJson() {
        // 给缓存中放json字符串,拿出json字符串时,要逆转为能用的对象类型 (序列化与反序列化)

        // 1.加入缓存逻辑,缓存中存的数据是json字符串
        // JSON的好处:跨平台、跨语言兼容
        String catalogJSON = redisTemplate.opsForValue().get("catalogJSON");
        if (StringUtils.isEmpty(catalogJSON)) {
            // 2.缓存中没有数据,查询数据库
            Map<String, List<Catelog2VO>> catalogJsonFromDb = getCatalogJsonFromDb();
            // 3.将查到的数据放入缓存,将查出的对象转为json
            String jsonString = JSON.toJSONString(catalogJsonFromDb);
            redisTemplate.opsForValue().set("catalogJSON",jsonString);
            return catalogJsonFromDb;
        }

        // 转为我们指定的对象
        Map<String, List<Catelog2VO>> result = JSON.parseObject(
                catalogJSON,new TypeReference<Map<String, List<Catelog2VO>>>(){});

        return result;
    }

Redis相关内存泄露及解决

大并发、压力测试时,出现内存泄漏、崩溃的OutOfDirectMemoryError(堆外内存溢出)。
产生的原因:
//1)、springboot2.0以后默认使用lettuce作为操作redis的客户端。它使用netty进行网络通信。
//2)、lettuce的bug导致netty堆外内存溢出 -Xmx300m;netty如果没有指定堆外内存,默认使用-Xmx300m
// 可以通过-Dio.netty.maxDirectMemory进行设置
解决方案
//解决方案:不能使用-Dio.netty.maxDirectMemory只去调大堆外内存。
真正的解决方案有下面两个
1)、升级lettuce客户端。
2)、切换使用jedis
redisTemplate:
lettuce、jedis操作redis的底层客户端。Spring对他们再次封装成redisTemplate;
所以无论底层使用的lettuce还是jedis,在spring中操作redis,直接使用redisTemplate就行。

jedis是个老版的客户端。

这个问题目前新的版本依赖应该已经没了。了解下这个问题,使用lettuce客户端,性能很好,jedis比较慢,现在应该很少用了,现在应该也不会出现这个堆外内存溢出的问题。

如何使用jedis

修改pom文件,原本引入的redis依赖为:

 <!--        引入redis-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

修改后的pom文件,引入的依赖为:

<!--        引入redis-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>io.lettuce</groupId>
                    <artifactId>lettuce-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
        </dependency>

缓存失效问题及解决

1、缓存穿透

什么是缓存穿透?
缓存穿透是指查询一个一定不存在的数据,由于缓存未不命中,将去查询数据库,但是数据库也无此记录,我们没有将这次查询的 null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。 (在缓存中查询一个用不存在的数据,使得请求定然会转给DB)

有何风险?
在流量大时,可能 DB就挂掉了,要是有人利用不存在的 key频繁攻击我们的应用,这就是漏洞。

如何解决?
缓存空结果、并且设置短的过期时间。

2、缓存雪崩

什么是缓存雪崩?
缓存雪崩是指在我们设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失 效,请求全部转发到 DB,DB瞬时压力过重雪崩。 (缓存中存储的数据,同一时刻大面积失效,请求同一时刻全都转给DB)

如何解决?
原有的失效时间基础上增加一个随机值,比如 1-5 分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

3、缓存击穿

什么是缓存击穿?
对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。
如果这个 key在大量请求同时进来前正好失效,那么所有对这个 key的数据查询都落到 db,我们称为缓存击穿。

如何解决?
加锁。大量并发只让一个人去查,其他人等待,查到以后释放锁。其他人获得到锁,先查缓存,就会有数据,不用去DB。

本地锁

在 使用缓存改造获取商品三级分类业务 这一节中,简单的使用了redis缓存,但是没有考虑上一节中,缓存穿透、雪崩、击穿的情况及其解决,现在要解决这些问题:
1、空结果缓存:解决缓存穿透
2、设置过期时间(加随机值):解决缓存雪崩

3、加锁:解决缓存击穿
这里1和2都比较好解决,重点在于3,如何加锁,本地锁与分布式锁等。
本地锁只能锁住当前进程,我们需要的是分布式锁(当然,分布式锁的性能相对于i本地锁来说较慢)。
本地锁,使用synchorized同步代码块的方式,将 确认缓存中没有数据、查数据库、将查询结果放入缓存这几个操作,作为一个原子操作,放到synchorized同步代码块中。
这在分布式情况下,会有问题,即:本地锁只能锁住当前进程。

分布式锁

基本原理:
请求可以去同一个地方”占坑”,如果占到,就执行逻辑,否则就必须等待,知道释放锁。“占坑”可以去redis,可以去数据库,可以去任何大家都能到达的地方。

一点一点来演进分布式锁。

阶段一,分布式锁的简单逻辑:

//从数据库查询并封装分类数据,使用redis锁的方式
    public Map<String, List<Catelog2VO>> getCatalogJsonFromDbWithRedisLock() {

        // 1.占用分布式锁  -> 去redis占坑
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");
        if (lock) {
            // 加锁成功,执行业务
            Map<String, List<Catelog2VO>> dataFromDb = getDataFromDb();
            // 执行完业务要删除锁
            redisTemplate.delete("lock");
            return dataFromDb;
        } else {
            // 加锁失败
            // 休眠100ms重试
            return getCatalogJsonFromDbWithRedisLock();  //自旋的方式
        }

    }

这里面,会有一个问题,那就是redis中,set nx的方式占好了位,如果业务代码出现异常,或者程序在执行到这里时宕机,没有执行删除锁的逻辑,就会造成死锁。
这时,我们可以给锁redis锁设置一个过期时间,这样锁即使没有被代码逻辑删除,也会自动删除。

阶段二,redis锁设置过期时间:

//从数据库查询并封装分类数据,使用redis锁的方式
    public Map<String, List<Catelog2VO>> getCatalogJsonFromDbWithRedisLock() {

        // 1.占用分布式锁  -> 去redis占坑
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");
        if (lock) {
            // 加锁成功,执行业务
            // 2、给redis锁设置过期时间
            redisTemplate.expire("lock",30,TimeUnit.SECONDS);
            Map<String, List<Catelog2VO>> dataFromDb = getDataFromDb();
            // 执行完业务要删除锁
            redisTemplate.delete("lock");
            return dataFromDb;
        } else {
            // 加锁失败
            // 休眠100ms重试
            return getCatalogJsonFromDbWithRedisLock();  //自旋的方式
        }

    }

这时,面临另一个问题,如果set nx设置好的时候,在执行设置过期时间的逻辑前,服务器宕机,又会造成死锁。
这就要求我们设置过期时间和占位必须是原子的,redis支持使用setnx ex命令:

阶段三,设置过期时间和占位是原子的

//从数据库查询并封装分类数据,使用redis锁的方式
    public Map<String, List<Catelog2VO>> getCatalogJsonFromDbWithRedisLock() {

        // 1.占用分布式锁  -> 去redis占坑
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111", 300, TimeUnit.SECONDS);
        if (lock) {
            // 加锁成功,执行业务
            //2、设置过期时间,必须和加锁是同步的,原子的
            //redisTemplate.expire("lock",30,TimeUnit.SECONDS);
            Map<String, List<Catelog2VO>> dataFromDb = getDataFromDb();
            // 执行完业务要删除锁
            redisTemplate.delete("lock");
            return dataFromDb;
        } else {
            // 加锁失败
            // 休眠100ms重试
            return getCatalogJsonFromDbWithRedisLock();  //自旋的方式
        }

    }

这时,面临的下一个问题是:执行的锁的值是指定的一个固定的值,删除锁时,直接删除的话,如果由于业务时间很长,锁自己过期了,我们直接删除锁,有可能把别的线程正在把持的锁删掉了。
解决方案:占锁的时候,值指定为uuid,每个线程匹配是自己的锁才删除。

阶段四,占锁的时候,值指定为uuid,每个线程匹配是自己的锁才删除:

 //从数据库查询并封装分类数据,使用redis锁的方式
    public Map<String, List<Catelog2VO>> getCatalogJsonFromDbWithRedisLock() {

        // 1.占用分布式锁  -> 去redis占坑
        // 占锁的时候,值指定为uuid,每个线程匹配是自己的锁才删除
        String uuid = UUID.randomUUID().toString();  // 占锁的时候,值指定为uuid,每个线程匹配是自己的锁才删除
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
        if (lock) {
            // 加锁成功,执行业务
            //2、设置过期时间,必须和加锁是同步的,原子的
            //redisTemplate.expire("lock",30,TimeUnit.SECONDS);
            Map<String, List<Catelog2VO>> dataFromDb = getDataFromDb();
            // 执行完业务要删除锁
            String lockValue = redisTemplate.opsForValue().get("lock");
            if(uuid.equals(lockValue)){
                //删除自己的锁
                redisTemplate.delete("lock");//删除锁
            }
            return dataFromDb;
        } else {
            // 加锁失败
            // 休眠100ms重试
            return getCatalogJsonFromDbWithRedisLock();  //自旋的方式
        }

    }

这时,也还会面临新的问题:如果正好判断是当前值,正要删除锁的时候,锁已经过期,别人已经设置了新的值,那么删除的是别人的锁。
解决方案:删除锁必须保证原子性,使用redis+lua脚本完成。

阶段五:删除锁时必须保证原子性,使用redis+lua脚本完成

//从数据库查询并封装分类数据,使用redis锁的方式
    public Map<String, List<Catelog2VO>> getCatalogJsonFromDbWithRedisLock() {

        // 1.占用分布式锁  -> 去redis占坑
        // 占锁的时候,值指定为uuid,每个线程匹配是自己的锁才删除
        String uuid = UUID.randomUUID().toString();  // 占锁的时候,值指定为uuid,每个线程匹配是自己的锁才删除
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
        if (lock) {
            System.out.println("获取分布式锁成功...");
            // 加锁成功,执行业务
            //2、设置过期时间,必须和加锁是同步的,原子的
            //redisTemplate.expire("lock",30,TimeUnit.SECONDS);
            Map<String, List<Catelog2VO>> dataFromDb;
            try {
                dataFromDb = getDataFromDb();
            } finally {
                //获取值对比+对比成功删除,这需要是一个原子操作,这里可以使用lua脚本解锁,来保证原子性
                String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
                //删除锁 (原子删锁)
                Long lock1 = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class)
                        , Arrays.asList("lock"), uuid);
            }

            // 执行完业务要删除锁
            //获取值对比+对比成功删除,这需要是一个原子操作,这里可以使用lua脚本解锁,来保证原子性
//            String lockValue = redisTemplate.opsForValue().get("lock");
//            if(uuid.equals(lockValue)){
//                //删除自己的锁
//                redisTemplate.delete("lock");//删除锁
//            }
            return dataFromDb;
        } else {
            System.out.println("获取分布式锁失败...等待重试");
            // 加锁失败
            // 休眠200ms重试
            try {
                Thread.sleep(200);
            } catch (Exception e) {

            }
            return getCatalogJsonFromDbWithRedisLock();  //自旋的方式
        }

    }

综合来说,就是要保证加锁的【占位+过期时间】和解锁的【判断+删除】的原子性。这里完成后,还有个更难的事情:
如何实现锁的自动续期?

阶段六:使用Redisson锁:

  //从数据库查询并封装分类数据,使用Redisson锁的方式
    public Map<String, List<Catelog2VO>> getCatalogJsonFromDbWithRedissonLock() {

        // 1.锁的名字一样,就会上锁。  锁的名字牵扯到锁的粒度。锁的粒度越细,运行越快
        //  锁的粒度:具体缓存的是某个数据,11-号商品;  product-11-lock product-12-lock   product-lock
        RLock lock = redisson.getLock("catalogJson-lock");
        lock.lock();

        Map<String, List<Catelog2VO>> dataFromDb;
        try {
            dataFromDb = getDataFromDb();
        } finally {
            lock.unlock();
        }

        return dataFromDb;
    }


分布式锁Redisson

Redisson简介与整合

Redisson 是架设在 Redis 基础上的一个 Java 驻内存数据网格(In-Memory Data Grid)。充分的利用了 Redis 键值数据库提供的一系列优势,基于 Java 实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工 具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式 系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。
官方文档:https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95

我们以后使用Redisson来完成所有的分布式锁功能。

Redisson里面的锁与JUC中的锁基本差不多,用法都差不多。

锁的名字一样,就会上锁。 锁的名字牵扯到锁的粒度。锁的粒度越细,运行越快。

整合步骤:
1、导入Redisson的依赖(以gulimall-product为例):
Redisson有自己的maven依赖,也提供了springboot的起步依赖。如果使用springboot的起步依赖,那么很多配置已经默认配置好了,我们只需要进行一些简单的配置就可以使用。 现在刚开始学习,我们就导入Redisson自己的maven依赖,把里面的一些东西自己写一遍,这样深入来理解和学习一些:

 <!-- 以后使用redisson作为所有分布式锁,分布式对象等功能框架-->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.12.0</version>
        </dependency>

2、配置Redisson
可以程序化配置(写配置类)或者文件化配置(写在配置文件中)来配置,我们这里使用程序化配置来实现配置Redisson:

package com.atguigu.gulimall.product.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;

@Configuration
public class MyRedissonConfig {

    /**
     * 所有对Redisson的使用都是通过RedissonClient对象
     * @return
     * @throws IOException
     */
    @Bean(destroyMethod="shutdown")
    public RedissonClient redisson(@Value("${spring.redis.host}") String url) throws IOException {
        //1、创建配置
        //Redis url should start with redis:// or rediss://
        Config config = new Config();
        config.useSingleServer().setAddress("redis://"+url+":6379");
        //2、根据Config创建出RedissonClient示例
        RedissonClient redissonClient = Redisson.create(config);
        return redissonClient;
    }
}

注意,如果redis设置了密码,要加上设置密码的setPassword方法。
测试:

@Autowired
    RedissonClient redissonClient;


    @Test
    public void testRedisson() {
        System.out.println(redissonClient);
    }

打印出了RedissonClient的实例地址,说明正确配置上,可以使用了:

org.redisson.Redisson@6ebbc06

Redisson之lock锁测试

 @Autowired
    RedissonClient redisson;

    @ResponseBody
    @GetMapping("/hello")
    public String hello() {
        // 通过RedissonClient实例调用getLock()方法获取一把锁,
        // 只要锁的名字一样,就是同一把锁
        RLock myLock = redisson.getLock("my-lock");
        // 2.执行加锁方法。 用获取到的锁执行lock()方法即可加锁
        myLock.lock();  // 这个方法是阻塞式等待,加不到锁会一直等待,知道加上锁
        try {
            System.out.println("加锁成功,执行业务..."+Thread.currentThread().getId());
            Thread.sleep(30000);
        }catch (Exception e) {

        }finally {
            // 3.解锁方法
            System.out.println("释放锁..."+Thread.currentThread().getId());
            myLock.unlock();
        }

        return "hello";
    }

开启两个网页,地址栏分别都访问http://localhost:10000/hello,会发现,第一次访问结束,第二次才会访问进去,打印如下:

加锁成功,执行业务…101 释放锁…101 加锁成功,执行业务…102 释放锁…102

这里,假设解锁方法因为服务器宕机等问题没有解锁,也不会出现死锁问题,
因为Redisson的lock有如下特点:
加锁时,默认加的锁都是30s的时间;
锁会自动续期,假设业务超长,运行期间会自动给锁续上新的30s,不用担心业务过长,锁会被自动删掉;
加锁业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁也会在30s后自动删除;

Redisson之lock看门狗原理&Redisson如何解决死锁问题

我们来自己指定锁时间:

 @Autowired
    RedissonClient redisson;

    @ResponseBody
    @GetMapping("/hello")
    public String hello() {
        // 通过RedissonClient实例调用getLock()方法获取一把锁,
        // 只要锁的名字一样,就是同一把锁
        RLock myLock = redisson.getLock("my-lock");
        // 2.执行加锁方法。 用获取到的锁执行lock()方法即可加锁
//        myLock.lock();  // 这个方法是阻塞式等待,加不到锁会一直等待,直到加上锁,默认加的锁都是30s时间。
        //1)、锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s。不用担心业务时间长,锁自动过期被删掉
        //2)、加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除。
        myLock.lock(10, TimeUnit.SECONDS); //10秒自动解锁,自动解锁时间一定要大于业务的执行时间。
        try {
            System.out.println("加锁成功,执行业务..."+Thread.currentThread().getId());
            Thread.sleep(30000);
        }catch (Exception e) {

        }finally {
            // 3.解锁方法 假设解锁代码没有运行,redisson会不会出现死锁
            System.out.println("释放锁..."+Thread.currentThread().getId());
            myLock.unlock();
        }

        return "hello";
    }

这里要注意,自己设置的自动解锁时间一定要大于业务的执行时间,而且,自己设置超时时间的方式加锁,在锁的时间到了以后,不会自动续期。
//1、如果我们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间
//2、如果我们未指定锁的超时时间,就使用30 * 1000【LockWatchdogTimeout看门狗的默认时间】;只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】,每隔10s都会自动再次续期,续成30s。 隔多久续期一次: internalLockLeaseTime【看门狗时间】 / 3 (三分之一的看门狗时间,也就是10s)。 也就是说,此时默认是30秒的超时时间,并且。每隔十秒续期一次,重新续成三十秒。

但是实际开发中,推荐如下方式使用:
实际开发,还是推荐明显的指定超时时间:
lock.lock(30,TimeUnit.SECONDS);省掉了整个续期操作。手动解锁
只不过这里自己把指定的时间设的大一点,比如就是30s,防止业务执行时间过长而超时。业务执行成功后,就手动解锁。
因为如果业务时间过长,那说明这中间出了问题,完蛋了
即:加锁时设置超时时间,业务成功后手动解锁。 因为如果时间很长业务还没执行完,那么考虑优化业务性能吧。

Redisson之读写锁ReadWriteLock


    @Autowired
    RedissonClient redisson;

    @Autowired
    StringRedisTemplate redisTemplate;

//保证一定能读到最新数据,修改期间,写锁是一个排他锁(互斥锁、独享锁)。读锁是一个共享锁
    //写锁没释放读就必须等待
    // 读 + 读: 相当于无锁,并发读,只会在redis中记录好,所有当前的读锁。他们都会同时加锁成功
    // 写 + 读: 等待写锁释放
    // 写 + 写: 阻塞方式
    // 读 + 写: 有读锁。写也需要等待。
    // 只要有写的存在,都必须等待
    @GetMapping("/write")
    @ResponseBody
    public String writeValue() {

        RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");

        String uuid = "";
        RLock writeLock = readWriteLock.writeLock();
        try {
            // 1.改数据,加写锁;读数据,加读锁
            writeLock.lock();

            uuid = UUID.randomUUID().toString();
            // 休眠30s,模拟写入数据要花费时间
            Thread.sleep(30000);
            redisTemplate.opsForValue().set("writeValue",uuid);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            writeLock.unlock();
        }

        return uuid;
    }

    @GetMapping("/read")
    @ResponseBody
    public String readValue() {

        RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");

        String uuid = "";
        // 加读锁
        RLock readLock = readWriteLock.readLock();
        readLock.lock();
        try {
            uuid = redisTemplate.opsForValue().get("writeValue");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readLock.unlock();
        }

        return uuid;
    }

使用读写锁的好处:
保证一定能读到最新数据,修改期间,写锁是一个排他锁(互斥锁、独享锁)。读锁是一个共享锁。
//写锁没释放读就必须等待
// 读 + 读: 相当于无锁,并发读,只会在redis中记录好,所有当前的读锁。他们都会同时加锁成功
// 写 + 读: 等待写锁释放
// 写 + 写: 阻塞方式,下一个 写锁必须等待上一个写锁完全释放,才能进行;
// 读 + 写: 有读锁。写也需要等待。
// 只要有写的存在,都必须等待

Redisson之信号量Semaphore


  @Autowired
    RedissonClient redisson;


/**
     * Redisson信号量使用场景模拟
     * 车库停车,
     * 假设有3车位,来一辆车占用一个车位,走一辆车释放一个车位,想要停车,要看车位够不够
     *
     * 信号量也可以用作分布式限流,来限制每一个应用的流量;
     */
    @GetMapping("/park")
    @ResponseBody
    public String park() throws InterruptedException {
        RSemaphore park = redisson.getSemaphore("park");
        //获取一个信号(可以看作获取一个值),此场景看作占一个车位
        // acquire()是阻塞式获取,即一定要获取一个值,获取不到会一直等待获取
//        park.acquire();
        // tryAcquire()是尝试获取,获取得到就获取,获取不到,就返回false
        boolean b = park.tryAcquire();
        if(b){
            //执行业务
        }else {
            return "error";
        }

        return "ok=>"+b;
    }

    @GetMapping("/go")
    @ResponseBody
    public String go() {
        RSemaphore park = redisson.getSemaphore("park");
        park.release(); //释放一个信号,此场景看作释放一个车位
        return "ok";
    }

Redisson之闭锁CountDownLatch


 @Autowired
    RedissonClient redisson;


/**
     * 闭锁 案例
     *
     * 放假,锁门
     * 1班没人了,2班没人了......
     * 5个班全部走完,我们可以锁大门
     */
    @GetMapping("/lockDoor")
    @ResponseBody
    public String lockDoor() throws InterruptedException {
        RCountDownLatch door = redisson.getCountDownLatch("door");
        door.trySetCount(5);
        door.await(); //等待闭锁都完成

        return "放假了...";
    }

    @GetMapping("/gogogo/{id}")
@ResponseBody
    public String gogogo(@PathVariable("id") Long id){
        RCountDownLatch door = redisson.getCountDownLatch("door");
        door.countDown();//计数减一;

//        CountDownLatch

        return id+"班的人都走了...";
    }

缓存数据与DB数据一致性

先利用Redisson改造原本的 获取商品三级分类业务 的接口,改造后如下:

  //从数据库查询并封装分类数据,使用Redisson锁的方式
    public Map<String, List<Catelog2VO>> getCatalogJsonFromDbWithRedissonLock() {

        // 1.锁的名字一样,就会上锁。  锁的名字牵扯到锁的粒度。锁的粒度越细,运行越快
        //  锁的粒度:具体缓存的是某个数据,11-号商品;  product-11-lock product-12-lock   product-lock
        RLock lock = redisson.getLock("catalogJson-lock");
        lock.lock();

        Map<String, List<Catelog2VO>> dataFromDb;
        try {
            dataFromDb = getDataFromDb();
        } finally {
            lock.unlock();
        }

        return dataFromDb;
    }

这时,牵扯到一个问题:即缓存数据一致性问题。 如何保证缓存里面的数据和数据库里面数据的一致性?
常用有两种方式:
1)双写模式:数据库修改后,就跟着也把缓存中的数据也做修改;
2)失效模式:数据库修改后,直接将缓存删掉,等待下次主动查询进行更新;
当然,这两种模式在大并发下都会产生一定的漏洞:
双写模式:会产生脏数据
image.png
这时,可以在更新数据时,加锁来解决。一个线程更新时,得到锁,等这个线程全更新完了,释放锁后,其他线程才能得到锁。或者,如果对数据的准确性(一致性)要求不是特别高,可以在缓存设计时,就设置一个过期时间,等到缓存过期后,重新写入缓存数据,以此来保证数据的最终一致性即可。

失效模式:同样可能产生脏数据。
image.png
这个问题同样可以和双写模式一样,通过加锁来解决,但是,加锁后的系统会比较笨重一些,即性能较低。

这里就要注意一点,经常性修改的数据,且对读的准确性要求很高的数据,就应该直接读取数据库。

image.png

所以,基于双写模式或失效模式,改进方案:
改进方案1:
使用分布式读写锁。读数据等待写数据整个操作完成之后才能获取锁。
改进方案2:
使用cananl
image.png
cananl是阿里开源的一个中间件,可以伪装mysql的从库。具体操作后续学习。

现在谷粒商城系统的一致性解决方案:
1、缓存的所有数据都有过期时间;这样即使有脏数据,缓存过期后,下次查询也能触发主动更新;
2、读写数据的时候,加上分布式的读写锁;经常写经常读的话,对性能肯定有极大的影响,然是如果偶尔写一次,读的次数很频繁,那么影响微乎其微。
(读写锁最大的特点:读操作相当于无锁状态)
目前该谷粒商城项目采用失效模式,在修改了数据后,直接删除缓存,等待下次用户查询时,进行更新缓存数据。同时,为了避免更新导致的一些乱序问题,在这个失效模式中,加上分布式的读写锁。

SpringCache

简介

Spring从 3.1开始定义了 org.springframework.cache.Cache和 org.springframework.cache.CacheManager 接口来统一不同的缓存技术;并支持使用 JCache(JSR-107)注解简化我们开发;

Cache接口为缓存的组件规范定义,包含缓存的各种操作集合;
Cache 接口下 Spring 提供了各种 xxxCache 的实现; 如 RedisCache , EhCacheCache , ConcurrentMapCache 等;

每次调用需要缓存功能的方法时,Spring会检查检查指定参数的指定的目标方法是否已经被调用过;如果有就直接从缓存中获取方法调用后的结果,如果没有就调用方法并缓 存结果后返回给用户。下次调用直接从缓存中获取。

使用 Spring缓存抽象时我们需要关注以下两点;
1、确定方法需要被缓存以及他们的缓存策略
2、从缓存中读取之前缓存存储的数据

整合SpringCache简化redis缓存开发

第一步:引入SpringCache的依赖,同时,以redis作为缓存,也引入redis的springboot的起步依赖 (以gulimall-product为例):

 <!-- SpringCache依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-cache</artifactId>
        </dependency>

        <!-- redis在springboot中的起步依赖 -->
        <dependency>
            <groupId>org.springframework.session</groupId>
            <artifactId>spring-session-data-redis</artifactId>
        </dependency>

第二步:写配置
首先,springboot会自动配置一些东西:
CacheAutoConfiguration会导入RedisCacheConfiguration;这样一来,在RedisCacheConfiguration中就自动配置好了缓存管理器RedisCacheManger,同时也配置好了一些别的东西。
那么我们要配置什么:
要在配置文件中,指名使用redis作为缓存:
当然这是最简化的配置,后面需要配置什么用到了再学习:

spring.cache.type=redis

测试通过@Cacheable注解使用缓存

这里我们只需要掌握SpringCache的几个注解:
@Cacheable: Triggers cache population.:触发将数据保存到缓存的操作
@CacheEvict: Triggers cache eviction.:触发将数据从缓存删除的操作
@CachePut: Updates the cache without interfering with the method execution.:不影响方法执行更新缓存
@Caching: Regroups multiple cache operations to be applied on a method.:组合多个操作
@CacheConfig: Shares some common cache-related settings at class-level.:在类级别共享缓存的相同配置

使用步骤:_1.在springboot的启动类上,标注 @EnableCaching注解开启缓存功能
2.只需要使用注解就能完成缓存操作
例:
启动类标注@EnableCaching注解:

@EnableCaching
@EnableFeignClients(basePackages = "com.atguigu.gulimall.product.feign")
@EnableDiscoveryClient
@MapperScan("com.atguigu.gulimall.product.dao")
@SpringBootApplication
public class GulimallProductApplication {

    public static void main(String[] args) {
        SpringApplication.run(GulimallProductApplication.class, args);
    }

}

controller下的方法:

 @GetMapping({"/","/index.html"})
    public String indexPage(Model model) {

        // 查出所有的一级分类
        List<CategoryEntity> categoryEntities =  categoryService.getLevel1Categories();
        model.addAttribute("categories",categoryEntities);
        // 视图解析器进行拼串:
        // classpath:/templates/ +返回值+  .html
        return "index";
    }

里面categoryService.getLevel1Categories();在getLevel1Categories()中使用springcache缓存:

@Cacheable({"category"})
    @Override
    public List<CategoryEntity> getLevel1Categories() {
        System.out.println("getLevel1Categories.....");
        List<CategoryEntity> categoryEntities = baseMapper
                .selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", 0));
        return categoryEntities;
    }

SpringCache细节设置及说明

规则说明 1、每一个需要缓存的数据我们都来指定要放到哪个名字的缓存。
【建议缓存的分区(按照业务类型分)】
2、 @Cacheable({“category”})
代表当前方法的结果需要缓存,如果缓存中有,方法不用调用。
如果缓存中没有,会调用方法,最后将方法的结果放入缓存
3、默认行为
1)、如果缓存中有,方法不用调用。
2)、key默认自动生成;缓存的名字::SimpleKey
3)、缓存的value的值。默认使用jdk序列化机制,将序列化后的数据存到redis
4)、默认ttl时间 -1; 默认ttl时间即默认过期时间,这里面默认是-1,就是没设置过期时间,这不符合我们的设计规范,我们要的是所有的缓存数据都要有默认过期时间。
我们可以通过自定义一些规范来设定默认过期时间、序列化机制等

4、自定义规则
1)、指定生成的缓存使用的key: 设置@Cacheable注解的key属性指定,这个属性接收一个SpEL表达式
SpEL的详细信息参考:
https://docs.spring.io/spring/docs/5.1.12.RELEASE/spring-framework-reference/integration.html#cache-spel-context
例:
@Cacheable
(value = {“category”},key = “‘level1Categories’”)
@Override
public List
<_CategoryEntity_> getLevel1Categories(){}
或:
@Cacheable
(value = {“category”},key = “#root.method.name”)
@Override
public List
<_CategoryEntity_> getLevel1Categories(){}
2)、指定缓存的数据的存活时间: 在配置文件中修改ttl
# 指定缓存的存活时间,是以毫秒为单位的
spring.cache.redis.time-to-live=3600000
3)、将数据保存为json格式:
自定义RedisCacheConfiguration即可

SpringCache原理
CacheAutoConfiguration -> RedisCacheConfiguration ->
自动配置了RedisCacheManager->初始化所有的缓存->每个缓存决定使用什么配置
->如果redisCacheConfiguration有就用已有的,没有就用默认配置
->想改缓存的配置,只需要给容器中放一个RedisCacheConfiguration即可
* ->就会应用到当前RedisCacheManager管理的所有缓存分区中_

缓存自动配置类CacheAutoConfiguration中,帮我们导入了Redis的RedisCacheConfiguration;而RedisCacheConfiguration中自动配置类缓存管理器RedisCacheManager;
RedisCacheManager会初始化所有的缓存,每个缓存决定使用什么配置;
这一步如何决定的呢?
-> 如果有RedisCacheConfiguration就用已有的,没有就用默认配置;
-> 想修改缓存的配置,只需要给容器中放一个RedisCacheConfiguration即可;
-> 这个放到容器的RedisCacheConfiguration就会应用到当前RedisCaacheManager管理的 所有缓存分区中。

SpringCache自定义缓存配置

可以通过自定义SpringCache的缓存配置,来设定数据保存格式等,比如把数据保存为json格式:

package com.atguigu.gulimall.product.config;

import org.springframework.boot.autoconfigure.cache.CacheProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * 本类说明:
 *  缓存配置类
 * @author yuanhai
 */
@EnableConfigurationProperties(CacheProperties.class)
@Configuration
@EnableCaching   //  这里配置了@EnableCaching注解的话,启动类上就不需要配置@EnableCaching注解了
public class MyCacheConfig {

//    @Autowired
//    CacheProperties cacheProperties;

    /**
     * 配置文件中的东西没有用上;
     *
     * 1、原来和配置文件绑定的配置类是这样子的
     *      @ConfigurationProperties(prefix = "spring.cache")
     *      public class CacheProperties
     *   这里只要它没有放到容器中,那就不会起作用
     *
     * 2、要让他生效
     *      @EnableConfigurationProperties(CacheProperties.class)
     *
     * @return
     */
    @Bean
    RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties){


        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
//        config = config.entryTtl();
        // 设置缓存key的序列化格式,不变,还是用原本的StringRedisSerializer
        config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()));
        // 设置缓存value的序列化格式,使用json格式
        config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));


        CacheProperties.Redis redisProperties = cacheProperties.getRedis();
        //将配置文件中的所有配置都生效
        if (redisProperties.getTimeToLive() != null) {
            config = config.entryTtl(redisProperties.getTimeToLive());
        }
        if (redisProperties.getKeyPrefix() != null) {
            config = config.prefixKeysWith(redisProperties.getKeyPrefix());
        }
        if (!redisProperties.isCacheNullValues()) {
            config = config.disableCachingNullValues();
        }
        if (!redisProperties.isUseKeyPrefix()) {
            config = config.disableKeyPrefix();
        }


        return config;
    }

}

@CacheEvict注解和@CachePut注解

@CacheEvict相当于保证缓存和DB数据一致性的失效模式
@CachePut相当于保证缓存和DB数据一致性的双写模式

@CacheEvict的使用:

 // value指定删哪个区域,key指定删哪个key的缓存
@CacheEvict(value = "category",key = "'getLevel1Categories'") 
@Transactional
@Override
public void updateCascade(CategoryEntity category) {
    this.updateById(category);
    categoryBrandRelationService.updateCategory(category.getCatId(),category.getName());

}
  /**
     * 功能描述:
     * 级联更新所有关联的数据
     *
     * @CacheEvict:失效模式
     * 1、同时进行多种缓存操作  @Caching
     * 2、指定删除某个分区下的所有数据 @CacheEvict(value = "category",allEntries = true)
     * 3、存储同一类型的数据,都可以指定成同一个分区。分区名默认就是缓存的前缀
     *
     * @author yuanhai
     * @date 2022/2/15
     * @param category
     * @return void
     */
//    @CacheEvict(value = "category",key = "'getLevel1Categories'")  // value指定删哪个区域,key指定删哪个key的缓存

//    @Caching(evict = {
//            @CacheEvict(value = "category",key = "'getLevel1Categorys'"),
//            @CacheEvict(value = "category",key = "'getCatalogJson'")
//    })
    @CacheEvict(value = "category",allEntries = true) //失效模式
//    @CachePut //双写模式
    @Transactional
    @Override
    public void updateCascade(CategoryEntity category) {
        this.updateById(category);
        categoryBrandRelationService.updateCategory(category.getCatId(),category.getName());

        // 修改了数据后,为保证缓存和DB数据的一致性,可以用如下两种方式的其一
        //1、同时修改缓存中的数据
        //2、直接删掉缓存中的数据,redis.del("catalogJSON");等待下次主动查询进行更新

    }

Spring-Cache的不足和解决


读模式:
1.缓存穿透问题:查询一个null数据。
解决:缓存空数据;配置文件指定ache-null-values=true
# 指定缓存空值,防止缓存穿透
spring.cache.redis.cache-null-values=true

2.缓存击穿问题:大量并发进来同时查询一个正好过期的数据。
解决:加锁;默认是无加锁的;
@Cacheable设置sync属性,sync = true(加锁,解决击穿)
@Cacheable(_value = {“category”},key = “#root.method.name”,sync = true)
@Override
public List
<_CategoryEntity_> getLevel1Categories(){}_

  **3.缓存雪崩问题:大量的key同时过期。**<br />**解决:加随机时间。**<br />**只要加上过期时间即可。配置文件中配置:**<br />**spring.cache.redis.time-to-live=360000**

写模式:(要保证缓存与数据库一致)
1)、读写加锁。
2)、引入Canal,感知到MySQL的更新去更新数据库
3)、读多写多,直接去数据库查询就行
总结:
常规数据(读多写少,即时性,一致性要求不高的数据);完全可以使用Spring-Cache;写模式(只要缓存的数据有过期时间就足够了)
特殊数据:特殊设计