1. 用户登录

1.1 Session 共享问题

多台 Tomcat 并不共享 session 存储空间,当请求切换到不同 Tomcat 服务时导致数据丢失的问题

session 的替代方案应该满足

  • 数据共享
  • 内存存储
  • key、value 结构

image.png

1.2 基于 Redis 实现 Session 共享

image.png
利用 Spring MVC 提供的 HandlerInterceptor实现用户 Tomcat 的 Session 机制(主要是刷新 token 和保存 token)
image.png

  1. @Component
  2. public class TokenRefreshInterceptor implements HandlerInterceptor {
  3. @Resource
  4. private StringRedisTemplate stringRedisTemplate;
  5. @Override
  6. public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
  7. // 1. 从 header 头中获取 authorization
  8. String authorizationToken = request.getHeader("authorization");
  9. if (StrUtil.isBlank(authorizationToken)) {
  10. // 这里需要放行, 因为部分页面不需要登录
  11. return true;
  12. }
  13. // 2. 从 redis 中获取用户信息
  14. String redisKey = RedisConstants.USER_TOKEN_KEY + authorizationToken;
  15. Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(redisKey);
  16. // 3. 判断用户是否存在
  17. if (userMap.isEmpty()) {
  18. return true;
  19. }
  20. // 4. 将查询到的 hash 数据转为 User
  21. User user = BeanUtil.fillBeanWithMap(userMap, new User(), false);
  22. // 5. 存在,保存用户信息到 ThreadLocal
  23. UserHolder.saveUser(user);
  24. // 6. 刷新 token 有效期
  25. stringRedisTemplate.expire(redisKey, RedisConstants.USER_TOKEN_TTL, TimeUnit.MINUTES);
  26. return true;
  27. }
  28. @Override
  29. public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
  30. // 移除用户
  31. UserHolder.removeUser();
  32. }
  33. }
  1. @Component
  2. public class LoginInterceptor implements HandlerInterceptor {
  3. @Override
  4. public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
  5. // 必须要求强制登录(只对部分资源)
  6. if (UserHolder.getUser() == null) {
  7. response.setStatus(401);
  8. return false;
  9. }
  10. return true;
  11. }
  12. }
  1. @Slf4j
  2. @Service
  3. public class UserServiceImpl implements UserService {
  4. @Resource
  5. private UserMapper userMapper;
  6. @Resource
  7. private StringRedisTemplate stringRedisTemplate;
  8. @Override
  9. public Result sendCode(String phone) {
  10. // 1. 校验手机号码
  11. if (RegexUtils.isPhoneInvalid(phone)) {
  12. return Result.fail("手机号格式错误!");
  13. }
  14. // 2. 符合,生成验证码
  15. String code = RandomUtil.randomNumbers(6);
  16. // 3. 保存验证码
  17. stringRedisTemplate.opsForValue().set(RedisConstants.USER_CODE_KEY + phone, code, RedisConstants.USER_CODE_TTL, TimeUnit.MINUTES);
  18. // 4. 发送验证码
  19. log.info("发送短信验证码成功,验证码:{}", code);
  20. return Result.ok();
  21. }
  22. @Override
  23. public Result login(String phone, String code) {
  24. // 1. 校验手机号码
  25. if (RegexUtils.isPhoneInvalid(phone)) {
  26. return Result.fail("手机号格式错误!");
  27. }
  28. // 2. 从 redis 获取验证码并校验
  29. String cacheCode = stringRedisTemplate.opsForValue().get(RedisConstants.USER_CODE_KEY + phone);
  30. if (cacheCode == null || !cacheCode.equals(code)) {
  31. return Result.fail("验证码错误");
  32. }
  33. // 3. 查询用户信息
  34. Example example = new Example(User.class);
  35. example.createCriteria().andEqualTo("phone", phone);
  36. User user = userMapper.selectOneByExample(example);
  37. if (user == null) {
  38. // 4. 说明不存在首次注册
  39. user = this.create(phone);
  40. log.info("新注册用户 {}", user);
  41. } else {
  42. log.info("从数据库中查询到用户 {}", user);
  43. }
  44. // 5. 保存用户信息到 redis 中, 表示已经完成登录
  45. String token = UUID.randomUUID().toString(true);
  46. String tokenKey = RedisConstants.USER_TOKEN_KEY + token;
  47. // 需要全部转成 string
  48. stringRedisTemplate.opsForHash().putAll(tokenKey,
  49. BeanUtil.beanToMap(user, new HashMap<>(), CopyOptions.create()
  50. .setIgnoreNullValue(true)
  51. .setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString())));
  52. stringRedisTemplate.expire(tokenKey, 30, TimeUnit.MINUTES);
  53. // 6. 返回 token
  54. return Result.ok(token);
  55. }
  56. private User create(String phone) {
  57. User user = new User();
  58. user.setPhone(phone);
  59. user.setName("user_" + RandomUtil.randomString(10));
  60. userMapper.insertSelective(user);
  61. return user;
  62. }
  63. }

2. 缓存功能

2.1 缓存更新策略

内存淘汰 超时剔除 主动更新
说明 不用自己维护,利用 Redis 的内存淘汰机制,当内存不足时自动淘汰部分数据

下次查询时更新缓存
给缓存数据添加 TTL 过期时间,到期后自动删除缓存

下次查询时更新缓存
编写业务逻辑,在修改数据库的同时更新缓存
一致性 一般
维护成本

2.1.1 主动更新策略

其中主动更新策略又分为三种,一般常用第一种方式 Cache Aside Pattern

Cache Aside Pattern Read/Write Through Pattern Write Behind Caching Pattern
由缓存的调用者,在更新数据库的同时更新缓存 缓存与数据库整合为一个服务,由服务来维护一致性。调用者调用该服务,无需关心缓存一致性问题 调用者只操作缓存,由其它线程异步的将缓存数据持久化到数据库,保证最终一致

2.1.2 Cache Aside Pattern

操作缓存和数据库时有 3 个问题需要考虑

  1. 删除缓存还是更新缓存
    • 更新缓存:每次更新数据库都更新缓存,无效写操作较多(❎)
    • 删除缓存:更新数据库时让缓存失效,查询时再更新缓存(✅)
  2. 如何保证缓存与数据库的操作的同时成功或失败
    • 单体系统,将缓存与数据库操作放在一个事务
    • 分布式系统,利用TCC等分布式事务方案
  3. 先操作缓存还是先操作数据库
    • 先删除缓存,再操作数据库(❎)
    • 先操作数据库,再删除缓存(✅)

image.png

2.1.3 缓存更新策略的最佳实践方案

低一致性需求:使用 Redis 自带的内存淘汰机制
高一致性需求:主动更新,并以超时剔除作为兜底方案

  • 读操作:缓存命中则直接返回缓存未命中则查询数据库,并写入缓存,设定超时时间
  • 写操作:先写数据库,然后再删除缓存要确保数据库与缓存操作的原子性

    2.2 缓存穿透

    缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。常见的解决方案:

  • 缓存空对象

    • 优点:实现简单,维护方便
    • 缺点:额外的内存消耗可能造成短期的不一致
  • 布隆过滤
    • 优点:内存占用较少,没有多余 key
    • 缺点:实现复杂存在误判可能

image.png

  1. /**
  2. * 缓存穿透解决方案: 缓存空对象
  3. *
  4. * @param keyPrefix key 前缀
  5. * @param id 主键标识
  6. * @param type 返回值类型
  7. * @param dbFallback 加载资源数据的地方, 是一个回调函数
  8. * @param time 缓存时间
  9. * @param unit 缓存时间单位
  10. * @param <R> 返回值类型
  11. * @param <ID> 主键类型
  12. * @return 结果
  13. */
  14. public <R, ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type,
  15. Function<ID, R> dbFallback,
  16. Long time, TimeUnit unit) {
  17. // 1. 构建 redis key
  18. String key = keyPrefix + id;
  19. // 2. 从 redis 中查找缓存数据
  20. String json = stringRedisTemplate.opsForValue().get(key);
  21. // 3. 判断缓存中的数据是否为无效值
  22. if (Objects.equals(json, RedisConstants.CACHE_PASS_THROUGH_INVALID_FLAG)) {
  23. // 说明是无效值, 缓存穿透了, 直接返回
  24. return null;
  25. }
  26. // 4. 如果缓存中有数据且不为无效值直接返回
  27. if (StrUtil.isNotBlank(json)) {
  28. return JSONUtil.toBean(json, type);
  29. }
  30. // 5. 此时缓存中没有, 调用回调函数加载资源
  31. R result = dbFallback.apply(id);
  32. // 6. 如果不存在,则缓存无效值, 避免缓存穿透
  33. if (result == null) {
  34. this.set(key, RedisConstants.CACHE_PASS_THROUGH_INVALID_FLAG,
  35. RedisConstants.CACHE_PASS_THROUGH_INVALID_TTL, TimeUnit.SECONDS);
  36. return null;
  37. }
  38. // 7. 如果加载到数据, 放入到缓存中
  39. this.set(key, result, time, unit);
  40. return result;
  41. }

2.3 缓存雪崩

缓存雪崩是指在同一时段大量的缓存 key 同时失效或者 Redis 服务宕机,导致大量请求到达数据库,带来巨大压力。常见的解决方案:

  • 给不同的 Key 的 TTL 添加随机值
  • 利用 Redis 集群提高服务的可用性
  • 给缓存业务添加降级限流策略
  • 给业务添加多级缓存

    2.4 缓存击穿

    是针对缓存中没有但数据库有的数据。缓存击穿问题也叫热点 Key 问题,就是一个被高并发访问并且缓存重建业务较复杂的 key 突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。常见的解决方案:

  • 互斥锁

    • 优点:没有额外的内存消耗,保证一致性,实现简单
    • 缺点:线程需要等待,性能有影响,可能有死锁风险
  • 逻辑过期

    • 优点:线程无需等待,性能较好
    • 缺点:不保证一致性,有额外内存消耗,实现复杂

      image.png

      1. /**
      2. * 缓存击穿解决方案: 互斥锁
      3. *
      4. * @param keyPrefix key 前缀
      5. * @param id 主键标识
      6. * @param type 返回值类型
      7. * @param dbFallback 加载资源数据的地方, 是一个回调函数
      8. * @param time 缓存时间
      9. * @param unit 缓存时间单位
      10. * @param <R> 返回值类型
      11. * @param <ID> 主键类型
      12. * @return 结果
      13. */
      14. public <R, ID> R queryWithMutex(String keyPrefix, ID id, Class<R> type,
      15. Function<ID, R> dbFallback,
      16. Long time, TimeUnit unit) {
      17. // 1. 构建 redis key
      18. String key = keyPrefix + id;
      19. // 2. 从 redis 中查找缓存数据
      20. String json = stringRedisTemplate.opsForValue().get(key);
      21. // 3. 判断缓存中的数据是否为无效值
      22. if (Objects.equals(json, RedisConstants.CACHE_PASS_THROUGH_INVALID_FLAG)) {
      23. // 说明是无效值, 缓存穿透了, 直接返回
      24. return null;
      25. }
      26. // 4. 如果缓存中有数据且不为无效值直接返回
      27. if (StrUtil.isNotBlank(json)) {
      28. return JSONUtil.toBean(json, type);
      29. }
      30. // 5. 互斥锁实现缓存重建
      31. R result = null;
      32. String lockKey = RedisConstants.LOCK_KEY_PREFIX + key;
      33. try {
      34. // 6. 如果获取锁失败, 说明已经有线程在更新缓存, 这里进入重试即可
      35. if (!tryLock(lockKey)) {
      36. try {
      37. TimeUnit.MILLISECONDS.sleep(1);
      38. } catch (InterruptedException e) {
      39. log.error(e.getMessage(), e);
      40. }
      41. return this.queryWithMutex(keyPrefix, id, type, dbFallback, time, unit);
      42. }
      43. // 7. 如果获取锁成功, 则执行查询逻辑
      44. result = dbFallback.apply(id);
      45. // 5.3 如果不存在,则缓存无效值, 避免缓存穿透
      46. if (result == null) {
      47. this.set(key, RedisConstants.CACHE_PASS_THROUGH_INVALID_FLAG,
      48. RedisConstants.CACHE_PASS_THROUGH_INVALID_TTL, TimeUnit.SECONDS);
      49. return null;
      50. }
      51. // 8. 如果加载到数据, 放入到缓存中
      52. this.set(key, result, time, unit);
      53. } finally {
      54. this.unlock(lockKey);
      55. }
      56. return result;
      57. }
      1. /**
      2. * 缓存击穿解决方案: 逻辑过期
      3. *
      4. * @param keyPrefix key 前缀
      5. * @param id 主键标识
      6. * @param type 返回值类型
      7. * @param dbFallback 加载资源数据的地方, 是一个回调函数
      8. * @param time 缓存时间
      9. * @param unit 缓存时间单位
      10. * @param <R> 返回值类型
      11. * @param <ID> 主键类型
      12. * @return 结果
      13. */
      14. public <R, ID> R queryWithLogicalExpire(String keyPrefix, ID id, Class<R> type,
      15. Function<ID, R> dbFallback,
      16. Long time, TimeUnit unit) {
      17. // 1. 构建 redis key
      18. String key = keyPrefix + id;
      19. // 2. 从 redis 中查找缓存数据
      20. String json = stringRedisTemplate.opsForValue().get(key);
      21. // 3. 判断缓存中的数据是否为无效值
      22. if (Objects.equals(json, RedisConstants.CACHE_PASS_THROUGH_INVALID_FLAG)) {
      23. // 说明是无效值, 缓存穿透了, 直接返回
      24. return null;
      25. }
      26. // 4. 根据逻辑时间判断是否已经过期
      27. RedisData redisData = JSONUtil.toBean(json, RedisData.class);
      28. R result = JSONUtil.toBean((JSONObject) redisData.getData(), type);
      29. // 5. 如果有数据而且未过期, 直接返回
      30. if (result != null && redisData.getExpireTime().isAfter(LocalDateTime.now())) {
      31. return result;
      32. }
      33. // 6. 过期了或者缓存中无数据, 则需要缓存重建
      34. String lockKey = RedisConstants.LOCK_KEY_PREFIX + key;
      35. // 7. 如果获取锁失败, 说明已经有线程在更新缓存, 这里直接返回旧数据即可
      36. if (!tryLock(lockKey)) {
      37. return result;
      38. }
      39. // 8. 如果获取锁成功, 则开启一个新线程处理缓存重建任务
      40. cacheBuildThreadPool.submit(() -> {
      41. try {
      42. // 重建缓存
      43. R result1 = dbFallback.apply(id);
      44. if (result1 == null) {
      45. CacheClient.this.set(key, RedisConstants.CACHE_PASS_THROUGH_INVALID_FLAG,
      46. RedisConstants.CACHE_PASS_THROUGH_INVALID_TTL, TimeUnit.SECONDS);
      47. } else {
      48. CacheClient.this.setWithLogicalExpire(key, result1, time, unit);
      49. }
      50. } finally {
      51. unlock(lockKey);
      52. }
      53. });
      54. return result;
      55. }

      3. 秒杀功能

      3.1 全局唯一ID

      为了增加 ID 的安全性,我们可以不直接使用 Redis 自增的数值,而是拼接一些其它信息:
      image.png
      ID 的组成部分:
  • 符号位:1bit,永远为 0

  • 时间戳:31bit,以秒为单位,可以使用 69 年
  • 序列号:32bit,秒内的计数器,支持每秒产生 232 个不同 ID ```java @Component public class RedisIdGenerate {

    @Resource private StringRedisTemplate stringRedisTemplate;

    /**

    • 基础时间
    • LocalDateTime.of(2022, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC) */ private static final Long BEGIN_START_TIME = 1640995200L;

      public long nextId(String prefix) { // 1. 生成时间戳 LocalDateTime now = LocalDateTime.now(); long timestamp = now.toEpochSecond(ZoneOffset.UTC) - BEGIN_START_TIME; // 2. 生成序列号 String date = now.format(DateTimeFormatter.ofPattern(“yyyyMMdd”)); String key = RedisConstants.ID_GEN_KEY + prefix + “:” + date; long sequence = stringRedisTemplate.opsForValue().increment(key); // 3. 时间戳左移 32 位成高位, 低 32 位用 sequence 填充 return timestamp << 32 | sequence; }

}

  1. <a name="NmjeI"></a>
  2. ## 3.2 秒杀下单
  3. ```java
  4. @Override
  5. @Transactional(rollbackFor = Exception.class)
  6. public Result create(Long goodsId) {
  7. // 1. 校验优惠券开始时间和结束时间
  8. Goods goods = goodsService.getById(goodsId);
  9. if (goods == null) {
  10. return Result.fail("优惠券不存在");
  11. }
  12. LocalDateTime now = LocalDateTime.now();
  13. if (goods.getBeginTime().isAfter(now)) {
  14. return Result.fail("秒杀尚未开始");
  15. }
  16. if (goods.getEndTime().isBefore(now)) {
  17. return Result.fail("秒杀已经结束");
  18. }
  19. // 2. 校验库存
  20. if (goods.getStock() <= 0) {
  21. return Result.fail("商品库存不足");
  22. }
  23. // 3. 扣减库存
  24. int stockOpRes = goodsService.updateStock(goods);
  25. if (stockOpRes <= 0) {
  26. return Result.fail("扣减库存失败");
  27. }
  28. // 4. 生成订单
  29. Order order = new Order();
  30. order.setId(redisIdGenerate.nextId("order"));
  31. Long userId = UserHolder.getUser().getId();
  32. order.setUserId(userId);
  33. order.setGoodsId(goodsId);
  34. return Result.ok(orderMapper.insertSelective(order));
  35. }

在校验库存和扣减库存时,是有可能发生线程安全问题的,所以需要使用锁来保证线程安全

3.2.1 悲观锁

认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行。例如 Synchronized、Lock 都属于悲观锁

3.2.2 乐观锁

认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改

  • 如果没有修改则认为是安全的,自己才更新数据
  • 如果已经被其它线程修改说明发生了安全问题,此时可以重试或异常
    1. /**
    2. * 优化 1: 基于库存的乐观锁改良, 即把 stock 字段当成 version 作为版本控制
    3. * 如果这里每次都用乐观锁校验, 会有大量的重试才可以执行成功, 否则大量并发请求都会失败
    4. * 乐观锁: 提供一个 version 字段, 每次更新时需要校验当前的 version 是不是之前查询出来的 version
    5. */
    6. @Update("update goods set stock = stock-1 where id = #{goodsId} and stock = #{stock}")
    7. int updateStock(@Param("goodsId") Long goodsId, @Param("stock") int stock);
    1. /**
    2. * 优化 2: 基于库存的乐观锁改良, 只要满足 stock > 0 就可以扣减库存, 可以减少无畏重试
    3. */
    4. @Update("update goods set stock = stock-1 where id = #{goodsId} and stock > 0")
    5. int updateStockGt0(@Param("goodsId") Long goodsId);

    3.3 一人一单

    3.3.1 基于 setnx 实现的分布式锁

    | | MySQL | Redis | ZooKeeper | | —- | —- | —- | —- | | 互斥 | 利用 MySQL 本身的互斥锁机制 | 利用 setnx 这样的互斥命令 | 利用节点的唯一性和有序性实现互斥 | | 高可用 | 好 | 好 | 好 | | 高性能 | 一般 | 好 | 一般 | | 安全性 | 断开连接,自动释放锁 | 利用锁超时时间,到期释放 | 临时节点,断开连接自动释放 |

基于 Redis 的分布式锁实现思路:

  • 利用 set nx ex 获取锁,并设置过期时间,保存线程标示
  • 释放锁时先判断线程标示是否与自己一致,一致则删除锁

特性:

  • 利用 set nx 满足互斥性
  • 利用 set ex 保证故障时锁依然能释放,避免死锁,提高安全性
  • 利用 Redis 集群保证高可用和高并发特性

    1. -- 这里使用 lua 脚本保证操作的原子性, 即判断 + 删除操作
    2. -- 比较线程标识和锁中的标识是否一致
    3. if(redis.call('get', KEYS[1]) == ARGV[1]) then
    4. -- 释放锁
    5. return redis.call('del', KEYS[1])
    6. end
    7. return 0

    ```java @Slf4j public class RedisLock implements Lock {

    private final String name;

    private final StringRedisTemplate stringRedisTemplate;

    public RedisLock(String name, StringRedisTemplate stringRedisTemplate) {

    1. this.name = name;
    2. this.stringRedisTemplate = stringRedisTemplate;

    }

    private static final String THREAD_ID_PREFIX = UUID.fastUUID().toString(true);

    private static final DefaultRedisScript UNLOCK_SCRIPT;

    static {

    1. UNLOCK_SCRIPT = new DefaultRedisScript<>();
    2. UNLOCK_SCRIPT.setLocation(new ClassPathResource("lua/unlock.lua"));
    3. UNLOCK_SCRIPT.setResultType(Long.class);

    }

    @Override public boolean tryLock(long time, TimeUnit unit) {

    1. String threadId = THREAD_ID_PREFIX + "_" + Thread.currentThread().getId();
    2. // 加锁时需要把自己的线程 id 存入, 防止别人调用了解锁
    3. return Boolean.TRUE.equals(
    4. stringRedisTemplate.opsForValue().setIfAbsent(
    5. RedisConstants.LOCK_KEY + name,
    6. threadId, unit.toMillis(time), TimeUnit.MILLISECONDS));

    }

    @Override public void unlock() {

    1. String threadId = THREAD_ID_PREFIX + "_" + Thread.currentThread().getId();
    2. stringRedisTemplate.execute(
    3. UNLOCK_SCRIPT,
    4. Collections.singletonList(RedisConstants.LOCK_KEY + name),
    5. threadId);

    }

}

  1. ```java
  2. /**
  3. * 使用 setnx 实现分布式锁
  4. */
  5. @Transactional(rollbackFor = Exception.class)
  6. public Result createOrderWithSetNx(Long goodsId, int stock) {
  7. Long userId = UserHolder.getUser().getId();
  8. int orderOpRes;
  9. // 锁粒度是 goodsId + userId
  10. String name = "order:create:" + goodsId + "_" + userId;
  11. RedisLock lock = new RedisLock(name, stringRedisTemplate);
  12. // 1. 判断是否获取到锁, 每个人只能购买一次
  13. if (!lock.tryLock(30, TimeUnit.SECONDS)) {
  14. return Result.fail("活动太火爆, 请稍等");
  15. }
  16. try {
  17. // 2. 校验一人一单
  18. Example example = new Example(Order.class);
  19. example.createCriteria().andEqualTo("userId", userId).andEqualTo("goodsId", goodsId);
  20. List<Order> orderList = orderMapper.selectByExample(example);
  21. if (orderList.size() > 0) {
  22. return Result.fail("超出购买限制");
  23. }
  24. // 3. 扣减库存
  25. int stockOpRes = goodsService.updateStock(goodsId, stock);
  26. if (stockOpRes <= 0) {
  27. return Result.fail("扣减库存失败");
  28. }
  29. // 4. 生成订单
  30. Order order = new Order();
  31. order.setId(redisIdGenerate.nextId("order"));
  32. order.setUserId(userId);
  33. order.setGoodsId(goodsId);
  34. orderOpRes = orderMapper.insertSelective(order);
  35. if (orderOpRes <= 0) {
  36. return Result.fail("订单保存失败");
  37. }
  38. } finally {
  39. // 5. 释放锁
  40. lock.unlock();
  41. }
  42. return Result.ok("下单成功");
  43. }

3.3.2 基于 Redisson 实现的分布式锁

基于 setnx 实现的分布式锁存在下面的问题:

  • 不可重入:同一个线程无法多次获取同一把锁
  • 不可重试:获取锁只尝试一次就返回 false,没有重试机制
  • 超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患
  • 主从一致:如果 Redis 提供了主从集群,主从同步存在延迟,当主宕机时,如果从节点还未同步主节点中的锁数据,则会出现锁重复

    1. /**
    2. * 使用 redisson 实现
    3. */
    4. @Transactional(rollbackFor = Exception.class)
    5. public Result createOrderWithRedisson(Long goodsId) {
    6. Long userId = UserHolder.getUser().getId();
    7. int orderOpRes;
    8. // 锁粒度是 goodsId + userId
    9. String name = "order:create:" + goodsId + "_" + userId;
    10. // 1. 判断是否获取到锁, 每个人只能购买一次
    11. RLock lock = redissonClient.getLock(name);
    12. try {
    13. // 参数1: 获取锁的等待时间, 不为 -1 则会触发重试机制, 这里最多等待 10s(如果不传则默认值为-1, 不能触发重试机制)
    14. // 参数2: 默认释放锁的超时时间, 上锁之后 30s 会自动解锁(如果不传则默认值为-1, 会触发看门狗机制)
    15. // 参数3: 时间单位
    16. if (!lock.tryLock(10, 30, TimeUnit.SECONDS)) {
    17. return Result.fail("活动太火爆, 请稍等");
    18. }
    19. } catch (InterruptedException e) {
    20. log.error("获取锁失败", e);
    21. }
    22. try {
    23. // 2. 校验一人一单
    24. Example example = new Example(Order.class);
    25. example.createCriteria().andEqualTo("userId", userId).andEqualTo("goodsId", goodsId);
    26. List<Order> orderList = orderMapper.selectByExample(example);
    27. if (orderList.size() > 0) {
    28. return Result.fail("超出购买限制");
    29. }
    30. // 3. 扣减库存
    31. int stockOpRes = goodsService.updateStockGt0(goodsId);
    32. if (stockOpRes <= 0) {
    33. return Result.fail("扣减库存失败");
    34. }
    35. // 4. 生成订单
    36. Order order = new Order();
    37. order.setId(redisIdGenerate.nextId("order"));
    38. order.setUserId(userId);
    39. order.setGoodsId(goodsId);
    40. orderOpRes = orderMapper.insertSelective(order);
    41. if (orderOpRes <= 0) {
    42. return Result.fail("订单保存失败");
    43. }
    44. } finally {
    45. // 5. 释放锁
    46. lock.unlock();
    47. }
    48. return Result.ok("下单成功");
    49. }

    3.4 Redisson

    Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson 提供了使用 Redis 的最简单和最便捷的方法。Redisson 的宗旨是促进使用者对Redis 的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

    3.4.1 可重入锁

    可重入:利用 hash 结构记录线程 id 和重入次数
    可重试:利用信号量和 PubSub 功能实现等待、唤醒,获取锁失败的重试机制(waitTime 必须要设置值)
    超时续约:利用 watchDog 每隔一段时间(leaseTime / 3),重置超时时间(leaseTime 必须要为 -1 才能触发看门狗机制)

    1. -- 判断锁是否存在
    2. if (redis.call('exists', KEYS[1]) == 0) then
    3. -- 如果不存在就添加锁并设置当前的线程标示
    4. redis.call('hincrby', KEYS[1], ARGV[2], 1);
    5. -- 设置锁有效时间
    6. redis.call('pexpire', KEYS[1], ARGV[1]);
    7. return nil;
    8. end ;
    9. -- 如果锁存在且线程标示是自己
    10. if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    11. -- 锁次数 + 1,设置锁的有效期
    12. redis.call('hincrby', KEYS[1], ARGV[2], 1);
    13. redis.call('pexpire', KEYS[1], ARGV[1]);
    14. return nil;
    15. end ;
    16. -- 如果锁存在且线程标示不是自己,则获取锁失败,返回需要等待释放锁的时间
    17. return redis.call('pttl', KEYS[1]);
    1. -- 判断锁的线程标示是否是自己
    2. if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    3. -- 如果不是自己则直接返回
    4. return nil;
    5. end ;
    6. -- 如果锁线程标示是自己,锁次数 -1
    7. local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
    8. -- 如果此时锁次数 >0
    9. if (counter > 0) then
    10. -- 重置锁有效期
    11. redis.call('pexpire', KEYS[1], ARGV[2]);
    12. return 0;
    13. else
    14. -- 如果锁次数为 0,则需要释放锁
    15. redis.call('del', KEYS[1]);
    16. -- 通过 publish 命令发布事件,通知锁已经释放
    17. redis.call('publish', KEYS[2], ARGV[1]);
    18. return 1;
    19. end ;
    20. return nil;
    1. -- 判断锁的线程标示是否是自己
    2. if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    3. -- 续约超时时间
    4. redis.call('pexpire', KEYS[1], ARGV[1]);
    5. return 1;
    6. end ;
    7. return 0;

    05. Redisson 可重入锁原理.jpg

    3.4.2 联锁和红锁

    为了解决 Redis 主从集群宕带来锁失效的问题,可以使用联锁和红锁解决,即创建多个独立的 Redis 节点,分别对每个实例进行加锁

  • 联锁:N 个实例都必须获取到锁,有一个失败,即为失败

  • 红锁:N 个实例至少有 N/2 + 1 个实例获取到锁才算成功 ```java RLock lock1 = redissonInstance1.getLock(“lock1”); RLock lock2 = redissonInstance2.getLock(“lock2”); RLock lock3 = redissonInstance3.getLock(“lock3”);

RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3); // 同时加锁:lock1 lock2 lock3 // 所有的锁都上锁成功才算成功。 lock.lock(); … lock.unlock();

  1. ```java
  2. RLock lock1 = redissonInstance1.getLock("lock1");
  3. RLock lock2 = redissonInstance2.getLock("lock2");
  4. RLock lock3 = redissonInstance3.getLock("lock3");
  5. RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
  6. // 同时加锁:lock1 lock2 lock3
  7. // 红锁在大部分节点上加锁成功就算成功。
  8. lock.lock();
  9. ...
  10. lock.unlock();

3.5 异步秒杀

  1. 新增秒杀商品的同时,将商品信息保存到 Redis 中
  2. 基于Lua 脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
  3. 如果抢购成功,将 商品 id 和用户 id 封装后存入阻塞队列
  4. 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

    3.5.1 基于 List 结构模拟消息队列

    Redis 的 List 数据结构是一个双向链表,很容易模拟出队列效果。队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP 来实现。不过要注意的是,当队列中没有消息时 RPOPLPOP 操作会返回 null,并不像 JVM 的阻塞队列那样会阻塞并等待消息
    因此这里应该使用 BRPOP 或者 BLPOP来实现阻塞效果

基于List的消息队列有哪些优缺点

  • 优点
    • 利用 Redis 存储,不受限于 JVM 内存上限
    • 基于 Redis 的持久化机制,数据安全性有保证
    • 可以满足消息有序性
  • 缺点

    • 无法避免消息丢失
    • 只支持单消费者 ```java @Slf4j @Service public class OrderAsyncServiceImpl implements OrderService {

      @Resource private OrderMapper orderMapper;

      @Resource private GoodsService goodsService;

      @Resource private StringRedisTemplate stringRedisTemplate;

      @Resource private RedisIdGenerate redisIdGenerate;

      private static final DefaultRedisScript SEC_KILL_SCRIPT;

      static { SEC_KILL_SCRIPT = new DefaultRedisScript<>(); SEC_KILL_SCRIPT.setLocation(new ClassPathResource(“lua/seckill.lua”)); SEC_KILL_SCRIPT.setResultType(Long.class); }

      @Override public Result create(Long goodsId) { // 1. 校验优惠券开始时间和结束时间 Goods goods = goodsService.getById(goodsId); if (goods == null) {

      1. return Result.fail("优惠券不存在");

      } LocalDateTime now = LocalDateTime.now(); if (goods.getBeginTime().isAfter(now)) {

      1. return Result.fail("秒杀尚未开始");

      } if (goods.getEndTime().isBefore(now)) {

      1. return Result.fail("秒杀已经结束");

      } // 2. 创建订单 return this.createAsyncOrder(goods); }

      /**

      • 异步下单 */ private Result createAsyncOrder(Goods goods) { Long userId = UserHolder.getUser().getId(); Long goodsId = goods.getId(); long orderId = redisIdGenerate.nextId(“order”); Long res = stringRedisTemplate.execute(SEC_KILL_SCRIPT, Collections.emptyList(),
        1. String.valueOf(goodsId), String.valueOf(userId), String.valueOf(orderId));
        if (res == null) {
        1. return Result.fail("下单失败");
        } int result = res.intValue(); if (result == 1) {
        1. return Result.fail("商品库存不足");
        } if (result == 2) {
        1. return Result.fail("超出购买限制");
        } return Result.ok(“下单成功”); }

      @PostConstruct public void dealTopicOrder() { Executors.newSingleThreadExecutor(r -> new Thread(r, “deal-topic-order”))

      1. .execute(() -> {
      2. while (true) {
      3. // 从 topic:order 中获取订单数据, 完成下单操作
      4. String result = stringRedisTemplate.opsForList().rightPop("topic:order", 3, TimeUnit.SECONDS);
      5. if (StringUtil.isBlank(result)) {
      6. continue;
      7. }
      8. TopicOrderData topicOrderData = JSONUtil.toBean(result, TopicOrderData.class);
      9. Long userId = topicOrderData.getUserId();
      10. Long goodsId = topicOrderData.getGoodsId();
      11. // 1. 校验一人一单
      12. Example example = new Example(Order.class);
      13. example.createCriteria().andEqualTo("userId", userId).andEqualTo("goodsId", goodsId);
      14. List<Order> orderList = orderMapper.selectByExample(example);
      15. if (orderList.size() > 0) {
      16. log.error("超出购买限制");
      17. return;
      18. }
      19. // 2. 扣减库存
      20. int stockOpRes = goodsService.updateStockGt0(goodsId);
      21. if (stockOpRes <= 0) {
      22. log.error("商品库存扣减失败");
      23. return;
      24. }
      25. // 3. 生成订单
      26. Order order = new Order();
      27. order.setId(redisIdGenerate.nextId("order"));
      28. order.setUserId(userId);
      29. order.setGoodsId(goodsId);
      30. int orderOpRes = orderMapper.insertSelective(order);
      31. if (orderOpRes <= 0) {
      32. log.error("订单生成失败");
      33. return;
      34. }
      35. log.info("异步创单完成 {}", order);
      36. }
      37. });

      }

}

  1. <a name="qsAN8"></a>
  2. ### 3.5.2 基于 Pub、Sub 的消息队列
  3. PubSub(发布订阅)是 Redis2.0 版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个 channel,生产者向对应 channel 发送消息后,所有订阅者都能收到相关消息
  4. - SUBSCRIBE channel [channel] :订阅一个或多个频道
  5. - PUBLISH channel msg :向一个频道发送消息
  6. - PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
  7. **基于 PubSub 的消息队列有哪些优缺点**
  8. - 优点
  9. - 采用发布订阅模型,支持多生产、多消费
  10. - 缺点:
  11. - 不支持数据持久化
  12. - 无法避免消息丢失
  13. - 消息堆积有上限,超出时数据丢失
  14. <a name="Odd1E"></a>
  15. ### 3.5.3 基于 Stream 的消息队列
  16. Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列
  17. **STREAM 类型消息队列的 XREADGROUP 命令特点**
  18. - 消息可回溯
  19. - 可以多消费者争抢消息,加快消费速度
  20. - 可以阻塞读取
  21. - 没有消息漏读的风险
  22. - 有消息确认机制,保证消息至少被消费一次
  23. <a name="LdvMk"></a>
  24. ### 3.5.4 Redis 实现消息队列总结
  25. | | List | PubSub | Stream |
  26. | --- | --- | --- | --- |
  27. | 消息持久化 | 支持 | 不支持 | 支持 |
  28. | 阻塞读取 | 支持 | 支持 | 支持 |
  29. | 消息堆积处理 | 受限于内存空间,可以利用多消费者加快处理 | 受限于消费者缓冲区 | 受限于队列长度,可以利用消费者组提高消费速度,减少堆积 |
  30. | 消息确认机制 | 不支持 | 不支持 | 支持 |
  31. | 消息回溯 | 不支持 | 不支持 | 支持 |
  32. <a name="uCzoO"></a>
  33. # 4. 其它功能
  34. <a name="j4Wo3"></a>
  35. ## 4.1 点赞排行榜
  36. 按照点赞时间先后排序,返回 Top3 的用户信息
  37. | | List | Set | SortedSet |
  38. | --- | --- | --- | --- |
  39. | 排序方式 | 按添加顺序排序 | 无法排序 | 根据 score 值排序 |
  40. | 唯一性 | 不唯一 | 唯一 | 不唯一 |
  41. | 查找方式 | 按索引查找或首尾查找 | 根据元素查找 | 根据元素查找 |
  42. ```java
  43. @Service
  44. public class BlogServiceImpl implements BlogService {
  45. @Resource
  46. private BlogMapper blogMapper;
  47. @Resource
  48. private UserService userService;
  49. @Resource
  50. private StringRedisTemplate stringRedisTemplate;
  51. @Override
  52. public Result create(Blog blog) {
  53. // 创建博客
  54. return Result.ok(blogMapper.insertSelective(blog));
  55. }
  56. @Override
  57. public Result like(Long blogId) {
  58. // 1. 获取登录用户 id
  59. User user = UserHolder.getUser();
  60. if (user == null) {
  61. // 说明当前用户未登录
  62. return Result.ok();
  63. }
  64. Long userId = user.getId();
  65. // 2. 判断当前用户是否已经点过赞
  66. String key = "blog:like:" + blogId;
  67. Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
  68. if (score == null) {
  69. // 如果未点赞, 则可以点赞
  70. int likedRes = blogMapper.liked(blogId);
  71. if (likedRes == 1) {
  72. // 添加到 sortedSet 集合中
  73. stringRedisTemplate.opsForZSet().add(key, userId.toString(), System.currentTimeMillis());
  74. }
  75. } else {
  76. // 如果已经点过赞, 则取消赞
  77. int unlikedRes = blogMapper.unliked(blogId);
  78. if (unlikedRes == 1) {
  79. // 从 sortedSet 集合中移除
  80. stringRedisTemplate.opsForZSet().remove(key, userId.toString());
  81. }
  82. }
  83. return Result.ok();
  84. }
  85. public Result list(Long blogId) {
  86. // 1. 查询最早点赞 top3
  87. String key = "blog:like:" + blogId;
  88. Set<String> top3 = stringRedisTemplate.opsForZSet().range(key, 0, 2);
  89. if (CollectionUtil.isEmpty(top3)) {
  90. return Result.ok();
  91. }
  92. // 2. 解析出用户 id
  93. List<Long> userIds = top3.stream().map(Long::valueOf).collect(Collectors.toList());
  94. List<User> userList = userService.findByIds(userIds);
  95. return Result.ok(userList);
  96. }
  97. }

4.2 用户签到

4.2.1 BitMap 用法

image.png
把每一个 bit 位对应当月的每一天,形成了映射关系。用 0 和 1 标示业务状态,这种思路就称为位图(BitMap)
Redis 中是利用 string 类型数据结构实现 BitMap,因此最大上限是 512M,转换为 bit 则是 232 个 bit 位

  1. SETBIT:向指定位置(offset)存入一个 0 1
  2. GETBIT :获取指定位置(offset)的 bit
  3. BITCOUNT :统计 BitMap 中值为 1 bit 位的数量
  4. BITFIELD :操作(查询、修改、自增)BitMap bit 数组中的指定位置(offset)的值
  5. BITFIELD_RO :获取 BitMap bit 数组,并以十进制形式返回
  6. BITOP :将多个 BitMap 的结果做位运算(与 、或、异或)
  7. BITPOS :查找 bit 数组中指定范围内第一个 0 1 出现的位置

4.2.2 签到功能

  1. @Override
  2. public Result sign() {
  3. // 1. 获取当前登录用户
  4. Long userId = UserHolder.getUser().getId();
  5. // 2. 获取当前日期
  6. LocalDateTime now = LocalDateTime.now();
  7. // 3. 拼接 key
  8. String key = RedisConstants.USER_SIGN_KEY + now.format(DateTimeFormatter.ofPattern("yyyyMM")) + ":" + userId;
  9. // 4. 获取今天是本月的第几天, 下标是从 1 开始
  10. int dayOfMonth = now.getDayOfMonth();
  11. Boolean signFlag = stringRedisTemplate.opsForValue().getBit(key, dayOfMonth - 1);
  12. if (Boolean.TRUE.equals(signFlag)) {
  13. return Result.ok("当天已经签到");
  14. }
  15. // 5. 向 BitMap 中设置值表示完成签到
  16. stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1, true);
  17. return Result.ok("签到成功");
  18. }

4.2.3 签到统计

连续签到天数:从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数,就是连续签到天数
如何得到本月到今天为止的所有签到数据BITFIELD key GET u[dayOfMonth] 0
如何从后向前遍历每个 bit 位:与 1 做与运算,就能得到最后一个 bit 位。随后右移 1 位,下一个 bit 位就成为了最后一个 bit 位

  1. @Override
  2. public Result signCount() {
  3. // 1. 获取当前登录用户
  4. Long userId = UserHolder.getUser().getId();
  5. // 2. 获取当前日期
  6. LocalDateTime now = LocalDateTime.now();
  7. // 3. 拼接 key
  8. String key = RedisConstants.USER_SIGN_KEY + now.format(DateTimeFormatter.ofPattern("yyyyMM")) + ":" + userId;
  9. // 4. 获取今天是本月的第几天, 下标是从 1 开始
  10. int dayOfMonth = now.getDayOfMonth();
  11. // 5.获取本月截止今天为止的所有的签到记录
  12. List<Long> result = stringRedisTemplate.opsForValue().bitField(
  13. key,
  14. BitFieldSubCommands.create()
  15. .get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)
  16. );
  17. if (CollectionUtil.isEmpty(result) || result.get(0) == null) {
  18. // 没有任何签到结果
  19. return Result.ok(0);
  20. }
  21. Long signNum = result.get(0);
  22. // 6. 循环遍历
  23. int count = 0;
  24. while (true) {
  25. // 与 1 做与运算,得到数字的最后一个 bit 位, 如果这个 bit 为 0, 则表示签到中断
  26. if ((signNum & 1) == 0) {
  27. break;
  28. } else {
  29. // 不为 0 表示签到
  30. count++;
  31. }
  32. // 把数字右移一位,抛弃最后一个 bit 位,继续下一个 bit 位
  33. signNum = signNum >> 1;
  34. }
  35. return Result.ok(count);
  36. }