缓存

概述

首页的访问量非常大,而首页中的商品类目访问量更大,鼠标移动就在访问,查询所有的数据,如果每次访问都实时到数据库获取数据,数据库的访问压力太大。

而这些信息一般更新的频率比较低,短时间内不会发生改变。因此,我们可以考虑在前台系统中,增加一层缓存,把这些数据缓存起来,请求到来时,不再调用数据接口,而是直接读取缓存中的数据。

这样就能大大减少首页分类加载所需时间,提高并发性能。

加不加缓存的标准:

  1. 变化频率低
  2. 访问频繁

即时性、数据一致性要求不高的,访问量大且更新频率不高的数据(读多,写少)
举例:
1.电商类应用,商品分类,商品列表等适合缓存并加一个失效时间(根据数据更新频率来定)
2.后台如果发布一个商品,买家需要5分钟才能看到新的商品一般还是可以接受的
3.物流信息

实现:使用Redis实现缓存。

缓存和分布式锁 - 图1

缓存分析

本地缓存

例如:使用map在单个服务下进行缓存。

缓存和分布式锁 - 图2

本地缓存再分布式中的问题

缓存和分布式锁 - 图3

分布式缓存

使用中间件实现分布式缓存。

缓存和分布式锁 - 图4

改造分类

封装数据库查询

  1. // 从数据库中获取分类
  2. public Map<String, List<Catalog2VO>> getCatalogJsonByDb() {
  3. // 查出所有一级分类
  4. List<CategoryEntity> level1Category = getLevel1Category(); // 防止多次查库
  5. // 将后面所有的查库先查出来,再通过getParentCid方法获取
  6. List<CategoryEntity> selectAllCategory = baseMapper.selectList(null);
  7. //封装数据
  8. Map<String, List<Catalog2VO>> map = level1Category.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), v -> {
  9. // 根据一级分类id,查询二级分类数据
  10. List<CategoryEntity> l2 = getParentCid(selectAllCategory, v.getCatId());
  11. // 封装二级分类数据
  12. List<Catalog2VO> catalog2VOList = new ArrayList<>();
  13. if (l2 != null) {
  14. catalog2VOList = l2.stream().map(categoryEntity -> {
  15. Catalog2VO catalog2VO = new Catalog2VO(v.getCatId().toString(), null, categoryEntity.getCatId().toString(), categoryEntity.getName());
  16. // 根据二级分类查找三级分类,并封装
  17. List<CategoryEntity> l3 = getParentCid(selectAllCategory, categoryEntity.getCatId());
  18. List<Catalog2VO.Catalog3Vo> list = l3.stream().map(categoryEntity1 -> {
  19. Catalog2VO.Catalog3Vo catalog3Vo = new Catalog2VO.Catalog3Vo(categoryEntity.getCatId().toString(), categoryEntity1.getCatId().toString(), categoryEntity1.getName());
  20. return catalog3Vo;
  21. }).collect(Collectors.toList());
  22. catalog2VO.setCatalog3List(list);
  23. return catalog2VO;
  24. }).collect(Collectors.toList());
  25. }
  26. return catalog2VOList;
  27. }));
  28. return map;
  29. }

从缓存中获取分类

  1. // 从缓存中获取分类
  2. @Override
  3. public Map<String, List<Catalog2VO>> getCatalogJson() {
  4. // 先从缓存中读取数据
  5. String catalogJson = redisTemplate.opsForValue().get("catalogJson");
  6. // 缓存中没有数据,从数据库中查询
  7. if (StringUtils.isEmpty(catalogJson)) {
  8. Map<String, List<Catalog2VO>> catalogJsonByDb = getCatalogJsonByDb();
  9. // 转换为JSON字符串保存在缓存中
  10. String toJSONString = JSON.toJSONString(catalogJsonByDb);
  11. redisTemplate.opsForValue().set("catalogJson", toJSONString, 1, TimeUnit.DAYS);
  12. return catalogJsonByDb;
  13. }
  14. // 将保存在缓存中的信息转化为对象
  15. Map<String, List<Catalog2VO>> map = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Catalog2VO>>>() {
  16. });
  17. return map;
  18. }

在压力测试下会出现堆外内存溢出

缓存和分布式锁 - 图5

解决方法:更换客户端为jedis

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-data-redis</artifactId>
  4. <exclusions>
  5. <exclusion>
  6. <!-- 排除lettuce客户端 -->
  7. <groupId>io.lettuce</groupId>
  8. <artifactId>lettuce-core</artifactId>
  9. </exclusion>
  10. </exclusions>
  11. </dependency>
  12. <!-- 添加jedis客户端 -->
  13. <dependency>
  14. <groupId>redis.clients</groupId>
  15. <artifactId>jedis</artifactId>
  16. </dependency>

缓存失效问题

缓存穿透

查询一个不存在的数据

缓存和分布式锁 - 图6

缓存雪崩

大面积的key失效

缓存和分布式锁 - 图7

缓存击穿

某一个热点key失效,导致太多请求DB

缓存和分布式锁 - 图8

加锁解决缓存击穿

在将数据库查询进行封装

  1. // 从数据库中获取,并存入redis中
  2. public Map<String, List<Catalog2VO>> getCatalogJsonByDbToRedis() {
  3. // 查出所有一级分类
  4. List<CategoryEntity> level1Category = getLevel1Category(); // 防止多次查库
  5. // 将后面所有的查库先查出来,再通过getParentCid方法获取
  6. List<CategoryEntity> selectAllCategory = baseMapper.selectList(null);
  7. //封装数据
  8. Map<String, List<Catalog2VO>> map = level1Category.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), v -> {
  9. // 根据一级分类id,查询二级分类数据
  10. List<CategoryEntity> l2 = getParentCid(selectAllCategory, v.getCatId());
  11. // 封装二级分类数据
  12. List<Catalog2VO> catalog2VOList = new ArrayList<>();
  13. if (l2 != null) {
  14. catalog2VOList = l2.stream().map(categoryEntity -> {
  15. Catalog2VO catalog2VO = new Catalog2VO(v.getCatId().toString(), null, categoryEntity.getCatId().toString(), categoryEntity.getName());
  16. // 根据二级分类查找三级分类,并封装
  17. List<CategoryEntity> l3 = getParentCid(selectAllCategory, categoryEntity.getCatId());
  18. List<Catalog2VO.Catalog3Vo> list = l3.stream().map(categoryEntity1 -> {
  19. Catalog2VO.Catalog3Vo catalog3Vo = new Catalog2VO.Catalog3Vo(categoryEntity.getCatId().toString(), categoryEntity1.getCatId().toString(), categoryEntity1.getName());
  20. return catalog3Vo;
  21. }).collect(Collectors.toList());
  22. catalog2VO.setCatalog3List(list);
  23. return catalog2VO;
  24. }).collect(Collectors.toList());
  25. }
  26. return catalog2VOList;
  27. }));
  28. // 转换为JSON字符串保存在缓存中,查到了还需要放入缓存,防止放入缓存有延时,导致后面再次查库
  29. String toJSONString = JSON.toJSONString(map);
  30. redisTemplate.opsForValue().set("catalogJson", toJSONString);
  31. return map;
  32. }

本地锁

本地锁版本

缓存和分布式锁 - 图9

注意放入在加锁后查出,必须在加锁的条件下放入缓存,防止有延迟,还没放入缓存,下一个就进行查库

  1. // 本地锁版本(已经废弃,不能支持分布式)
  2. public Map<String, List<Catalog2VO>> getCatalogJsonByDbWithLocalLock() {
  3. // 本地锁:synchronized,JUC(lock),在分布式0情况下,需要使用分布式锁
  4. synchronized (this) {
  5. // 得到锁以后还要检查一次,double check
  6. Map<String, List<Catalog2VO>> catalogJsonByDb = getCatalogJsonByDb();
  7. // 转换为JSON字符串保存在缓存中,查到了还需要放入缓存,防止放入缓存有延时,导致后面再次查库
  8. String toJSONString = JSON.toJSONString(catalogJsonByDb);
  9. redisTemplate.opsForValue().set("catalogJson", toJSONString);
  10. return catalogJsonByDb;
  11. }
  12. }

本地锁只能解决一个单体架构,并不能解决分布式缓存击穿的问题,但效率在单体架构较高。

分布式锁

缓存和分布式锁 - 图10

基本原理

缓存和分布式锁 - 图11

分布式锁阶段一

缓存和分布式锁 - 图12

  1. // 分布式锁版本(原生的set实现)
  2. @Deprecated
  3. public Map<String, List<Catalog2VO>> getCatalogJsonByDbWithRedisLock() {
  4. // 加锁取占坑
  5. Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent("lock", "111");
  6. if (ifAbsent) {
  7. // 加锁完成,执行业务
  8. Map<String, List<Catalog2VO>> result = getCatalogJsonByDbToRedis();
  9. redisTemplate.delete("lock"); // 释放锁
  10. return result;
  11. } else {
  12. // 加锁失败,重试
  13. // 休眠100ms重试
  14. return getCatalogJsonByDbWithRedisLock();
  15. }
  16. }

加入在释放锁的时候服务器宕机,那么就没有成功释放锁,此时会造成死锁问题。可以设置过期时间保证不出现死锁。

分布式锁阶段二

缓存和分布式锁 - 图13

阶段一出现的死锁问题,加上过期时间可以解决。

  1. // 分布式锁版本(原生的set实现)
  2. @Deprecated
  3. public Map<String, List<Catalog2VO>> getCatalogJsonByDbWithRedisLock() {
  4. // 加锁取占坑
  5. Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent("lock", "111");
  6. if (ifAbsent) {
  7. // 设置过期时间为30s
  8. redisTemplate.expire("lock", 30, TimeUnit.SECONDS);
  9. // 加锁完成,执行业务
  10. Map<String, List<Catalog2VO>> result = getCatalogJsonByDbToRedis();
  11. redisTemplate.delete("lock"); // 释放锁
  12. return result;
  13. } else {
  14. // 加锁失败,重试
  15. // 休眠100ms重试
  16. return getCatalogJsonByDbWithRedisLock();
  17. }
  18. }

但此时如果在设置过期时间前面宕机,那么过期时间设置不进去,所以过期时间和设置锁需要有原子性。

分布式锁阶段三

缓存和分布式锁 - 图14

优化代码如下:

  1. // 分布式锁版本(原生的set实现)
  2. @Deprecated
  3. public Map<String, List<Catalog2VO>> getCatalogJsonByDbWithRedisLock() {
  4. // 加锁取占坑
  5. Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent("lock", "111", 30, TimeUnit.SECONDS);
  6. if (ifAbsent) {
  7. // 设置过期时间为30s
  8. // redisTemplate.expire("lock", 30, TimeUnit.SECONDS);
  9. // 加锁完成,执行业务
  10. Map<String, List<Catalog2VO>> result = getCatalogJsonByDbToRedis();
  11. redisTemplate.delete("lock"); // 释放锁
  12. return result;
  13. } else {
  14. // 加锁失败,重试
  15. // 休眠100ms重试
  16. return getCatalogJsonByDbWithRedisLock();
  17. }
  18. }

在删除锁时,如果业务执行很长时间,锁自动过期,那么就会误删别人的锁。

分布式锁阶段四

此时设置uuid为锁的值,在删除时判断是否为自己的锁,可以一定程度防止误删。

缓存和分布式锁 - 图15

  1. // 分布式锁版本(原生的set实现)
  2. @Deprecated
  3. public Map<String, List<Catalog2VO>> getCatalogJsonByDbWithRedisLock() {
  4. // 加锁取占坑
  5. String uuid = UUID.randomUUID().toString();
  6. Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 30, TimeUnit.SECONDS);
  7. if (ifAbsent) {
  8. // 设置过期时间为30s,废弃,需要和加锁一起有原子性
  9. // redisTemplate.expire("lock", 30, TimeUnit.SECONDS);
  10. // 加锁完成,执行业务
  11. Map<String, List<Catalog2VO>> result = getCatalogJsonByDbToRedis();
  12. String lockValue = redisTemplate.opsForValue().get("lock"); // 获取锁的值
  13. if (uuid.equals(lockValue)) {
  14. // 删除自己的锁
  15. redisTemplate.delete("lock"); // 释放锁
  16. }
  17. return result;
  18. } else {
  19. // 加锁失败,重试
  20. // 休眠100ms重试
  21. return getCatalogJsonByDbWithRedisLock();
  22. }
  23. }

但如果在这条语句 String lockValue = redisTemplate.opsForValue().get(“lock”); 传输回来时,此时锁过期,那么redis会马上更新锁,但此时执行后面时会误删别人的锁。

分布式锁阶段五

阶段四的问题需要将 获取值对比和删除锁 操作合成原子一起操作才能防止阶段四的问题。和阶段二出现的问题一致。

使用官方提供的lua脚本,保证删除和获取一起进行。

缓存和分布式锁 - 图16

  1. // 分布式锁版本(原生的set实现)
  2. @Deprecated
  3. public Map<String, List<Catalog2VO>> getCatalogJsonByDbWithRedisLock() {
  4. // 加锁去占坑
  5. String uuid = UUID.randomUUID().toString();
  6. Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 30, TimeUnit.SECONDS);
  7. if (ifAbsent) {
  8. // 设置过期时间为30s,废弃,需要和加锁一起有原子性
  9. // redisTemplate.expire("lock", 30, TimeUnit.SECONDS);
  10. // 加锁完成,执行业务
  11. Map<String, List<Catalog2VO>> result;
  12. try {
  13. result = getCatalogJsonByDbToRedis();
  14. } finally {
  15. // 获取值和删除应该具有原子性
  16. // String lockValue = redisTemplate.opsForValue().get("lock"); // 获取锁的值
  17. // if (uuid.equals(lockValue)) {
  18. // 删除自己的锁
  19. // redisTemplate.delete("lock"); // 释放锁
  20. // }
  21. // 查询UUID是否是自己,是自己的lock就删除
  22. // 封装lua脚本(原子操作解锁)
  23. // 查询+删除(当前值与目标值是否相等,相等执行删除,不等返回0)
  24. String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1]\n" +
  25. "then\n" +
  26. " return redis.call('del',KEYS[1])\n" +
  27. "else\n" +
  28. " return 0\n" +
  29. "end";
  30. // 执行lua脚本,lock相当于赋值给脚本的KEYS[1],uuid相当于赋值给ARGV[1]
  31. // 删除成功返回1,失败返回0
  32. Long isDelete = redisTemplate.execute(new DefaultRedisScript<Long>(luaScript, Long.class),
  33. Arrays.asList("lock"), uuid);
  34. }
  35. return result;
  36. } else {
  37. // 加锁失败,重试
  38. // 休眠200ms重试
  39. try {
  40. Thread.sleep(200);
  41. } catch (InterruptedException e) {
  42. e.printStackTrace();
  43. }
  44. return getCatalogJsonByDbWithRedisLock();
  45. }
  46. }

Ression实现分布式锁

概述

官网🔗

Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。充分的利用了Redis键值数据库提供的一系列优势,基于Java实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。

可重入锁(Reentrant Lock)

链接🔗

导入依赖

  1. <!-- 以后使用redisson作为所有分布式锁,分布式对象框架 -->
  2. <dependency>
  3. <groupId>org.redisson</groupId>
  4. <artifactId>redisson</artifactId>
  5. <version>3.12.0</version>
  6. </dependency>

没有超时时间

  1. @ResponseBody
  2. @GetMapping("/hello")
  3. public String hello() {
  4. // 获取去一把锁,只要锁的名字一样,就是同一把锁
  5. RLock lock = redissonClient.getLock("my-lock");
  6. // 加锁
  7. lock.lock(); //阻塞式等待。默认加的锁都是3es时间。
  8. try {
  9. // 锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s。不用担心业务时间长,锁自动过期被删
  10. // 加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除。
  11. System.out.println("加锁成功,业务执行时间..." + Thread.currentThread().getId());
  12. Thread.sleep(30000);
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. } finally {
  16. // 最后一定要释放锁,解锁代码之前出现异常,解锁没有运行,redissson不会出现死锁
  17. System.out.println("释放锁" + Thread.currentThread().getId());
  18. lock.unlock();
  19. }
  20. return "hello";
  21. }

设置过期时间

  1. @ResponseBody
  2. @GetMapping("/hello1")
  3. public String hello1() {
  4. // 获取去一把锁,只要锁的名字一样,就是同一把锁
  5. RLock lock = redissonClient.getLock("my-lock");
  6. lock.lock(10, TimeUnit.SECONDS); // 10秒自动解锁,自动解锁时间一定要大于业务的执行时间。
  7. // 问题: Lock.lock(10,TimeUnit.SECONDs);在锁时间到了以后,不会自动续期。
  8. // 1、如果我们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间
  9. // 2、如果我们未指定锁的超时时间,就使用30 * 1o00【LockwatchdogTimeout看门狗的默认时间】;
  10. // 只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】,每隔10s都会自动调用
  11. // internaLLockLeaseTime【看门狗时间】/ 3,10s
  12. try {
  13. // 锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s。不用担心业务时间长,锁自动过期被删
  14. // 加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除。
  15. System.out.println("加锁成功,业务执行时间..." + Thread.currentThread().getId());
  16. Thread.sleep(30000);
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. } finally {
  20. // 最后一定要释放锁,解锁代码之前出现异常,解锁没有运行,redissson不会出现死锁
  21. System.out.println("释放锁" + Thread.currentThread().getId());
  22. lock.unlock();
  23. }
  24. return "hello";
  25. }

问题: Lock.lock(10,TimeUnit.SECONDs);在锁时间到了以后,不会自动续期。

  • 如果我们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间
  • 如果我们未指定锁的超时时间,就使用30 * 1o00【LockwatchdogTimeout看门狗的默认时间】;
    • 只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】,每隔10s都会自动调用
    • internaLLockLeaseTime【看门狗时间】/ 3,10s

最终建议使用过期时间来实现分布式锁,实战模板,设置超时时间

  1. // 加锁以后10秒钟自动解锁
  2. // 无需调用unlock方法手动解锁
  3. lock.lock(10, TimeUnit.SECONDS);
  4. // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
  5. boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
  6. if (res) {
  7. try {
  8. ...
  9. } finally {
  10. lock.unlock();
  11. }
  12. }

读写锁

保证一定能读到最新数据,修改期间,写锁是一个排他锁(互斥锁、独享锁)。

  • 读锁是一个共享锁,写锁没释放读就必须等待
  • 读+读:相当于无锁,并发读,只会在redis中记录好,所有当前的读锁。他们都会同时加锁成功1/写+读:等待写锁释放
  • 写+写:阻塞方式
  • 读+写:有读锁。写也需要等待。l/只要有写的存在,都必须等待
  1. // 写锁就是排他锁
  2. @ResponseBody
  3. @GetMapping("/write")
  4. public String write() {
  5. // 获取一把读写锁,只要锁的名字一样,就是同一把锁
  6. RReadWriteLock lock = redissonClient.getReadWriteLock("rw-lock");
  7. String s = "";
  8. RLock rLock = lock.writeLock();
  9. // 改数据读写锁,写数据加写锁
  10. rLock.lock();
  11. try {
  12. System.out.println("加锁成功,业务执行时间..." + Thread.currentThread().getId());
  13. Thread.sleep(30000);
  14. s = UUID.randomUUID().toString();
  15. redisTemplate.opsForValue().set("writeValue", s);
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. } finally {
  19. rLock.unlock();
  20. }
  21. return "hello";
  22. }
  23. // 读锁就是共享锁
  24. @ResponseBody
  25. @GetMapping("/read")
  26. public String read() {
  27. // 获取一把读写锁,只要锁的名字一样,就是同一把锁
  28. RReadWriteLock lock = redissonClient.getReadWriteLock("rw-lock");
  29. String s = "";
  30. RLock rLock = lock.readLock();
  31. // 改数据读写锁,写数据加写锁
  32. rLock.lock();
  33. try {
  34. System.out.println("加锁成功,业务执行时间..." + Thread.currentThread().getId());
  35. Thread.sleep(30000);
  36. s = (String) redisTemplate.opsForValue().get("writeValue");
  37. } catch (InterruptedException e) {
  38. e.printStackTrace();
  39. } finally {
  40. rLock.unlock();
  41. }
  42. return s;
  43. }

信号量

相当于汽车停车,有空位才会去停车,否则会一直等待。

  1. // 信号量测试
  2. @ResponseBody
  3. @GetMapping("/park")
  4. public String park() {
  5. RSemaphore park = redissonClient.getSemaphore("park");
  6. // park.acquire(); // 获取一个车位的值,占一个车位,如果车位没有值,会一直等待
  7. boolean b = park.tryAcquire();
  8. if (b) {
  9. // 执行业务
  10. } else {
  11. return "没有车位";
  12. }
  13. return "ok=" + b;
  14. }
  15. // 信号量测试
  16. @ResponseBody
  17. @GetMapping("/go")
  18. public String go() throws InterruptedException {
  19. RSemaphore park = redissonClient.getSemaphore("park");
  20. park.release(); // 释放一个车位,才能让停车获取
  21. return "success";
  22. }

闭锁

闭锁相当于教室关门,当人全部走了之后才会关门,否则会一直等待。

  1. // 闭锁测试(类似于开关门)
  2. // 需要door锁中的人全部走了才能关门
  3. @ResponseBody
  4. @GetMapping("/lockDoor")
  5. public String lockDoor() throws InterruptedException {
  6. RCountDownLatch downLatch = redissonClient.getCountDownLatch("door");
  7. downLatch.trySetCount(5); // 设置5个人走了才能关门
  8. downLatch.await(); // 关门操作,一直等待没有人才会关门
  9. return "关门成功!!!";
  10. }
  11. // 将闭锁减一,到0才会关门
  12. @ResponseBody
  13. @GetMapping("/leave")
  14. public String leave() throws InterruptedException {
  15. RCountDownLatch downLatch = redissonClient.getCountDownLatch("door");
  16. downLatch.countDown();
  17. return "离开成功!!!";
  18. }

Ression实现读取缓存

  1. // 分布式锁版本(Redissson实现)
  2. public Map<String, List<Catalog2VO>> getCatalogJsonByDbWithRedissonLock() {
  3. // 锁的名字。锁的粒度,越细越快。
  4. // 锁的粒度:具体缓存的是某个数据,11-号商品;product-11-lock product-12-lockproduct-Lock
  5. RLock rLock = redissonClient.getLock("catalogJson");
  6. rLock.lock(); // 加锁
  7. Map<String, List<Catalog2VO>> result;
  8. try {
  9. result = getCatalogJsonByDbToRedis();
  10. } finally {
  11. rLock.unlock(); // 解锁
  12. }
  13. return result;
  14. }

缓存数据不一致问题

在更新数据库中的信息时,缓存和数据存在不一致问题。

双写模式

缓存和分布式锁 - 图17

失效模式

缓存和分布式锁 - 图18

Canal数据同步

缓存和分布式锁 - 图19

根据实际情况解决

缓存和分布式锁 - 图20

最终项目方案

我们系统的—致性解决方案:

  • 缓存的所有数据都有过期时间,数据过期下一次查询触发主动更新
  • 读写数据的时候,加上分布式的读写锁。
    • 经常写,经常读的情况性能确实会下降,应为写锁和读锁排斥
    • 不经常写,经常读,性能影响不大,读锁和读锁不互斥

SpringCache

操作步骤

导入依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-cache</artifactId>
  4. </dependency>

指定使用什么作为缓存

  1. spring:
  2. cache:
  3. type: redis # 指定以redis作为缓存

开启缓存

  1. @EnableCaching // 开启缓存
  2. @SpringBootApplication
  3. @ComponentScan(basePackages = "com.atguigu")
  4. @EnableDiscoveryClient
  5. @EnableFeignClients(basePackages = "com.atguigu.gulimall.product.client")
  6. public class GulimallProductApplication {
  7. public static void main(String[] args) {
  8. SpringApplication.run(GulimallProductApplication.class, args);
  9. }
  10. }

使用注解@Cacheable

  1. @Cacheable(value = {"category"}) // 将数据保存到category分区
  2. @Override
  3. public List<CategoryEntity> getLevel1Category() {
  4. return this.list( new QueryWrapper<CategoryEntity>().eq("parent_cid", 0));
  5. }

使用注解@CacheEvict

缓存和分布式锁 - 图21

@caching可以包括多个@CacheEvict的内容

@CacheEvict删除什么分区下的key(key是字符串时,注意加上单引号)

  1. @Override
  2. @Caching(evict = {
  3. @CacheEvict(value = "category", key = "'getLevel1Category'"),
  4. @CacheEvict(value = "category", key = "'getCatalogJsonByDbBySpringCache'")
  5. })
  6. public void updateDetails(CategoryEntity category) {
  7. this.updateById(category);
  8. categoryBrandRelationService.updateCategoryDetails(category.getCatId(), category.getName());
  9. }

或者使用

  1. @CacheEvict(value = "category", allEntries = true) // 删除分区下的所有缓存
  2. public void updateDetails(CategoryEntity category) {}

自定义其他操作

设置分区下的key

  1. @Cacheable(value = {"category"},key = "#root.method.name") // 将数据保存到category分区,以方法名为key

设置过期时间

  1. spring:
  2. redis:
  3. time-to-live: 3600000 # ms为单位

其他配置

  1. spring:
  2. redis:
  3. key-prefix: CACHE_ # 缓存加上前缀,区分其他数据
  4. use-key-prefix: true # 是否使用前缀
  5. cache-null-values: true # 为空值是否保存,防止缓存穿透

配置value以Json的数据格式保存

原理

缓存和分布式锁 - 图22

创建配置类

缓存和分布式锁 - 图23

  1. @Configuration
  2. @EnableCaching // 开启缓存
  3. @EnableConfigurationProperties(CacheProperties.class) // 绑定CacheProperties类(是加载配置文件的类)
  4. public class CacheConfig {
  5. @Bean
  6. public RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties) { // 作为参数传入,也可以使用@Autiwired
  7. RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
  8. config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())); // 设置key的序列化机制
  9. config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));// 设置value的序列化机制为json序列化
  10. // 此时配置文件中的配置会失效
  11. CacheProperties.Redis redisProperties = cacheProperties.getRedis();
  12. if (redisProperties.getTimeToLive() != null) {
  13. config = config.entryTtl(redisProperties.getTimeToLive());
  14. }
  15. if (redisProperties.getKeyPrefix() != null) {
  16. config = config.prefixKeysWith(redisProperties.getKeyPrefix());
  17. }
  18. if (!redisProperties.isCacheNullValues()) {
  19. config = config.disableCachingNullValues();
  20. }
  21. if (!redisProperties.isUseKeyPrefix()) {
  22. config = config.disableKeyPrefix();
  23. }
  24. return config;
  25. }
  26. }

SpringCache的不足

缓存和分布式锁 - 图24

缓存击穿也只是在get获取缓存时加了本地锁,保存数据在缓存中并没有加本地所。

缓存和分布式锁 - 图25

总结:

  • 常规数据(读多写少,即时性,一致性要求不高的数据)﹔完全可以使用spring-Cache特殊数据:特殊设计
  • 写模式(只要缓存的数据有过期时间就足够了)