1. 用户登录
1.1 Session 共享问题
多台 Tomcat 并不共享 session 存储空间,当请求切换到不同 Tomcat 服务时导致数据丢失的问题
session 的替代方案应该满足
- 数据共享
- 内存存储
- key、value 结构
1.2 基于 Redis 实现 Session 共享
利用 Spring MVC 提供的 HandlerInterceptor
实现用户 Tomcat 的 Session 机制(主要是刷新 token 和保存 token)
@Component
public class TokenRefreshInterceptor implements HandlerInterceptor {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 1. 从 header 头中获取 authorization
String authorizationToken = request.getHeader("authorization");
if (StrUtil.isBlank(authorizationToken)) {
// 这里需要放行, 因为部分页面不需要登录
return true;
}
// 2. 从 redis 中获取用户信息
String redisKey = RedisConstants.USER_TOKEN_KEY + authorizationToken;
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(redisKey);
// 3. 判断用户是否存在
if (userMap.isEmpty()) {
return true;
}
// 4. 将查询到的 hash 数据转为 User
User user = BeanUtil.fillBeanWithMap(userMap, new User(), false);
// 5. 存在,保存用户信息到 ThreadLocal
UserHolder.saveUser(user);
// 6. 刷新 token 有效期
stringRedisTemplate.expire(redisKey, RedisConstants.USER_TOKEN_TTL, TimeUnit.MINUTES);
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
// 移除用户
UserHolder.removeUser();
}
}
@Component
public class LoginInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 必须要求强制登录(只对部分资源)
if (UserHolder.getUser() == null) {
response.setStatus(401);
return false;
}
return true;
}
}
@Slf4j
@Service
public class UserServiceImpl implements UserService {
@Resource
private UserMapper userMapper;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result sendCode(String phone) {
// 1. 校验手机号码
if (RegexUtils.isPhoneInvalid(phone)) {
return Result.fail("手机号格式错误!");
}
// 2. 符合,生成验证码
String code = RandomUtil.randomNumbers(6);
// 3. 保存验证码
stringRedisTemplate.opsForValue().set(RedisConstants.USER_CODE_KEY + phone, code, RedisConstants.USER_CODE_TTL, TimeUnit.MINUTES);
// 4. 发送验证码
log.info("发送短信验证码成功,验证码:{}", code);
return Result.ok();
}
@Override
public Result login(String phone, String code) {
// 1. 校验手机号码
if (RegexUtils.isPhoneInvalid(phone)) {
return Result.fail("手机号格式错误!");
}
// 2. 从 redis 获取验证码并校验
String cacheCode = stringRedisTemplate.opsForValue().get(RedisConstants.USER_CODE_KEY + phone);
if (cacheCode == null || !cacheCode.equals(code)) {
return Result.fail("验证码错误");
}
// 3. 查询用户信息
Example example = new Example(User.class);
example.createCriteria().andEqualTo("phone", phone);
User user = userMapper.selectOneByExample(example);
if (user == null) {
// 4. 说明不存在首次注册
user = this.create(phone);
log.info("新注册用户 {}", user);
} else {
log.info("从数据库中查询到用户 {}", user);
}
// 5. 保存用户信息到 redis 中, 表示已经完成登录
String token = UUID.randomUUID().toString(true);
String tokenKey = RedisConstants.USER_TOKEN_KEY + token;
// 需要全部转成 string
stringRedisTemplate.opsForHash().putAll(tokenKey,
BeanUtil.beanToMap(user, new HashMap<>(), CopyOptions.create()
.setIgnoreNullValue(true)
.setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString())));
stringRedisTemplate.expire(tokenKey, 30, TimeUnit.MINUTES);
// 6. 返回 token
return Result.ok(token);
}
private User create(String phone) {
User user = new User();
user.setPhone(phone);
user.setName("user_" + RandomUtil.randomString(10));
userMapper.insertSelective(user);
return user;
}
}
2. 缓存功能
2.1 缓存更新策略
内存淘汰 | 超时剔除 | 主动更新 | |
---|---|---|---|
说明 | 不用自己维护,利用 Redis 的内存淘汰机制,当内存不足时自动淘汰部分数据 下次查询时更新缓存 |
给缓存数据添加 TTL 过期时间,到期后自动删除缓存 下次查询时更新缓存 |
编写业务逻辑,在修改数据库的同时更新缓存 |
一致性 | 差 | 一般 | 好 |
维护成本 | 无 | 低 | 高 |
2.1.1 主动更新策略
其中主动更新策略又分为三种,一般常用第一种方式 Cache Aside Pattern
Cache Aside Pattern | Read/Write Through Pattern | Write Behind Caching Pattern |
---|---|---|
由缓存的调用者,在更新数据库的同时更新缓存 | 缓存与数据库整合为一个服务,由服务来维护一致性。调用者调用该服务,无需关心缓存一致性问题 | 调用者只操作缓存,由其它线程异步的将缓存数据持久化到数据库,保证最终一致 |
2.1.2 Cache Aside Pattern
操作缓存和数据库时有 3 个问题需要考虑
- 删除缓存还是更新缓存
- 更新缓存:每次更新数据库都更新缓存,无效写操作较多(❎)
- 删除缓存:更新数据库时让缓存失效,查询时再更新缓存(✅)
- 如何保证缓存与数据库的操作的同时成功或失败
- 单体系统,将缓存与数据库操作放在一个事务
- 分布式系统,利用TCC等分布式事务方案
- 先操作缓存还是先操作数据库
- 先删除缓存,再操作数据库(❎)
- 先操作数据库,再删除缓存(✅)
2.1.3 缓存更新策略的最佳实践方案
低一致性需求:使用 Redis 自带的内存淘汰机制
高一致性需求:主动更新,并以超时剔除作为兜底方案
- 读操作:缓存命中则直接返回缓存未命中则查询数据库,并写入缓存,设定超时时间
写操作:先写数据库,然后再删除缓存要确保数据库与缓存操作的原子性
2.2 缓存穿透
缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。常见的解决方案:
缓存空对象
- 优点:实现简单,维护方便
- 缺点:额外的内存消耗可能造成短期的不一致
- 布隆过滤
- 优点:内存占用较少,没有多余 key
- 缺点:实现复杂存在误判可能
/**
* 缓存穿透解决方案: 缓存空对象
*
* @param keyPrefix key 前缀
* @param id 主键标识
* @param type 返回值类型
* @param dbFallback 加载资源数据的地方, 是一个回调函数
* @param time 缓存时间
* @param unit 缓存时间单位
* @param <R> 返回值类型
* @param <ID> 主键类型
* @return 结果
*/
public <R, ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type,
Function<ID, R> dbFallback,
Long time, TimeUnit unit) {
// 1. 构建 redis key
String key = keyPrefix + id;
// 2. 从 redis 中查找缓存数据
String json = stringRedisTemplate.opsForValue().get(key);
// 3. 判断缓存中的数据是否为无效值
if (Objects.equals(json, RedisConstants.CACHE_PASS_THROUGH_INVALID_FLAG)) {
// 说明是无效值, 缓存穿透了, 直接返回
return null;
}
// 4. 如果缓存中有数据且不为无效值直接返回
if (StrUtil.isNotBlank(json)) {
return JSONUtil.toBean(json, type);
}
// 5. 此时缓存中没有, 调用回调函数加载资源
R result = dbFallback.apply(id);
// 6. 如果不存在,则缓存无效值, 避免缓存穿透
if (result == null) {
this.set(key, RedisConstants.CACHE_PASS_THROUGH_INVALID_FLAG,
RedisConstants.CACHE_PASS_THROUGH_INVALID_TTL, TimeUnit.SECONDS);
return null;
}
// 7. 如果加载到数据, 放入到缓存中
this.set(key, result, time, unit);
return result;
}
2.3 缓存雪崩
缓存雪崩是指在同一时段大量的缓存 key 同时失效或者 Redis 服务宕机,导致大量请求到达数据库,带来巨大压力。常见的解决方案:
- 给不同的 Key 的 TTL 添加随机值
- 利用 Redis 集群提高服务的可用性
- 给缓存业务添加降级限流策略
-
2.4 缓存击穿
是针对缓存中没有但数据库有的数据。缓存击穿问题也叫热点 Key 问题,就是一个被高并发访问并且缓存重建业务较复杂的 key 突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。常见的解决方案:
互斥锁
- 优点:没有额外的内存消耗,保证一致性,实现简单
- 缺点:线程需要等待,性能有影响,可能有死锁风险
逻辑过期
- 优点:线程无需等待,性能较好
- 缺点:不保证一致性,有额外内存消耗,实现复杂
/**
* 缓存击穿解决方案: 互斥锁
*
* @param keyPrefix key 前缀
* @param id 主键标识
* @param type 返回值类型
* @param dbFallback 加载资源数据的地方, 是一个回调函数
* @param time 缓存时间
* @param unit 缓存时间单位
* @param <R> 返回值类型
* @param <ID> 主键类型
* @return 结果
*/
public <R, ID> R queryWithMutex(String keyPrefix, ID id, Class<R> type,
Function<ID, R> dbFallback,
Long time, TimeUnit unit) {
// 1. 构建 redis key
String key = keyPrefix + id;
// 2. 从 redis 中查找缓存数据
String json = stringRedisTemplate.opsForValue().get(key);
// 3. 判断缓存中的数据是否为无效值
if (Objects.equals(json, RedisConstants.CACHE_PASS_THROUGH_INVALID_FLAG)) {
// 说明是无效值, 缓存穿透了, 直接返回
return null;
}
// 4. 如果缓存中有数据且不为无效值直接返回
if (StrUtil.isNotBlank(json)) {
return JSONUtil.toBean(json, type);
}
// 5. 互斥锁实现缓存重建
R result = null;
String lockKey = RedisConstants.LOCK_KEY_PREFIX + key;
try {
// 6. 如果获取锁失败, 说明已经有线程在更新缓存, 这里进入重试即可
if (!tryLock(lockKey)) {
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
return this.queryWithMutex(keyPrefix, id, type, dbFallback, time, unit);
}
// 7. 如果获取锁成功, 则执行查询逻辑
result = dbFallback.apply(id);
// 5.3 如果不存在,则缓存无效值, 避免缓存穿透
if (result == null) {
this.set(key, RedisConstants.CACHE_PASS_THROUGH_INVALID_FLAG,
RedisConstants.CACHE_PASS_THROUGH_INVALID_TTL, TimeUnit.SECONDS);
return null;
}
// 8. 如果加载到数据, 放入到缓存中
this.set(key, result, time, unit);
} finally {
this.unlock(lockKey);
}
return result;
}
/**
* 缓存击穿解决方案: 逻辑过期
*
* @param keyPrefix key 前缀
* @param id 主键标识
* @param type 返回值类型
* @param dbFallback 加载资源数据的地方, 是一个回调函数
* @param time 缓存时间
* @param unit 缓存时间单位
* @param <R> 返回值类型
* @param <ID> 主键类型
* @return 结果
*/
public <R, ID> R queryWithLogicalExpire(String keyPrefix, ID id, Class<R> type,
Function<ID, R> dbFallback,
Long time, TimeUnit unit) {
// 1. 构建 redis key
String key = keyPrefix + id;
// 2. 从 redis 中查找缓存数据
String json = stringRedisTemplate.opsForValue().get(key);
// 3. 判断缓存中的数据是否为无效值
if (Objects.equals(json, RedisConstants.CACHE_PASS_THROUGH_INVALID_FLAG)) {
// 说明是无效值, 缓存穿透了, 直接返回
return null;
}
// 4. 根据逻辑时间判断是否已经过期
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
R result = JSONUtil.toBean((JSONObject) redisData.getData(), type);
// 5. 如果有数据而且未过期, 直接返回
if (result != null && redisData.getExpireTime().isAfter(LocalDateTime.now())) {
return result;
}
// 6. 过期了或者缓存中无数据, 则需要缓存重建
String lockKey = RedisConstants.LOCK_KEY_PREFIX + key;
// 7. 如果获取锁失败, 说明已经有线程在更新缓存, 这里直接返回旧数据即可
if (!tryLock(lockKey)) {
return result;
}
// 8. 如果获取锁成功, 则开启一个新线程处理缓存重建任务
cacheBuildThreadPool.submit(() -> {
try {
// 重建缓存
R result1 = dbFallback.apply(id);
if (result1 == null) {
CacheClient.this.set(key, RedisConstants.CACHE_PASS_THROUGH_INVALID_FLAG,
RedisConstants.CACHE_PASS_THROUGH_INVALID_TTL, TimeUnit.SECONDS);
} else {
CacheClient.this.setWithLogicalExpire(key, result1, time, unit);
}
} finally {
unlock(lockKey);
}
});
return result;
}
3. 秒杀功能
3.1 全局唯一ID
为了增加 ID 的安全性,我们可以不直接使用 Redis 自增的数值,而是拼接一些其它信息:
ID 的组成部分:
符号位:1bit,永远为 0
- 时间戳:31bit,以秒为单位,可以使用 69 年
序列号:32bit,秒内的计数器,支持每秒产生 232 个不同 ID ```java @Component public class RedisIdGenerate {
@Resource private StringRedisTemplate stringRedisTemplate;
/**
- 基础时间
LocalDateTime.of(2022, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC) */ private static final Long BEGIN_START_TIME = 1640995200L;
public long nextId(String prefix) { // 1. 生成时间戳 LocalDateTime now = LocalDateTime.now(); long timestamp = now.toEpochSecond(ZoneOffset.UTC) - BEGIN_START_TIME; // 2. 生成序列号 String date = now.format(DateTimeFormatter.ofPattern(“yyyyMMdd”)); String key = RedisConstants.ID_GEN_KEY + prefix + “:” + date; long sequence = stringRedisTemplate.opsForValue().increment(key); // 3. 时间戳左移 32 位成高位, 低 32 位用 sequence 填充 return timestamp << 32 | sequence; }
}
<a name="NmjeI"></a>
## 3.2 秒杀下单
```java
@Override
@Transactional(rollbackFor = Exception.class)
public Result create(Long goodsId) {
// 1. 校验优惠券开始时间和结束时间
Goods goods = goodsService.getById(goodsId);
if (goods == null) {
return Result.fail("优惠券不存在");
}
LocalDateTime now = LocalDateTime.now();
if (goods.getBeginTime().isAfter(now)) {
return Result.fail("秒杀尚未开始");
}
if (goods.getEndTime().isBefore(now)) {
return Result.fail("秒杀已经结束");
}
// 2. 校验库存
if (goods.getStock() <= 0) {
return Result.fail("商品库存不足");
}
// 3. 扣减库存
int stockOpRes = goodsService.updateStock(goods);
if (stockOpRes <= 0) {
return Result.fail("扣减库存失败");
}
// 4. 生成订单
Order order = new Order();
order.setId(redisIdGenerate.nextId("order"));
Long userId = UserHolder.getUser().getId();
order.setUserId(userId);
order.setGoodsId(goodsId);
return Result.ok(orderMapper.insertSelective(order));
}
在校验库存和扣减库存时,是有可能发生线程安全问题的,所以需要使用锁来保证线程安全
3.2.1 悲观锁
认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行。例如 Synchronized、Lock 都属于悲观锁
3.2.2 乐观锁
认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改
- 如果没有修改则认为是安全的,自己才更新数据
- 如果已经被其它线程修改说明发生了安全问题,此时可以重试或异常
/**
* 优化 1: 基于库存的乐观锁改良, 即把 stock 字段当成 version 作为版本控制
* 如果这里每次都用乐观锁校验, 会有大量的重试才可以执行成功, 否则大量并发请求都会失败
* 乐观锁: 提供一个 version 字段, 每次更新时需要校验当前的 version 是不是之前查询出来的 version
*/
@Update("update goods set stock = stock-1 where id = #{goodsId} and stock = #{stock}")
int updateStock(@Param("goodsId") Long goodsId, @Param("stock") int stock);
/**
* 优化 2: 基于库存的乐观锁改良, 只要满足 stock > 0 就可以扣减库存, 可以减少无畏重试
*/
@Update("update goods set stock = stock-1 where id = #{goodsId} and stock > 0")
int updateStockGt0(@Param("goodsId") Long goodsId);
3.3 一人一单
3.3.1 基于 setnx 实现的分布式锁
| | MySQL | Redis | ZooKeeper | | —- | —- | —- | —- | | 互斥 | 利用 MySQL 本身的互斥锁机制 | 利用 setnx 这样的互斥命令 | 利用节点的唯一性和有序性实现互斥 | | 高可用 | 好 | 好 | 好 | | 高性能 | 一般 | 好 | 一般 | | 安全性 | 断开连接,自动释放锁 | 利用锁超时时间,到期释放 | 临时节点,断开连接自动释放 |
基于 Redis 的分布式锁实现思路:
- 利用
set nx ex
获取锁,并设置过期时间,保存线程标示 - 释放锁时先判断线程标示是否与自己一致,一致则删除锁
特性:
- 利用
set nx
满足互斥性 - 利用
set ex
保证故障时锁依然能释放,避免死锁,提高安全性 利用 Redis 集群保证高可用和高并发特性
-- 这里使用 lua 脚本保证操作的原子性, 即判断 + 删除操作
-- 比较线程标识和锁中的标识是否一致
if(redis.call('get', KEYS[1]) == ARGV[1]) then
-- 释放锁
return redis.call('del', KEYS[1])
end
return 0
```java @Slf4j public class RedisLock implements Lock {
private final String name;
private final StringRedisTemplate stringRedisTemplate;
public RedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private static final String THREAD_ID_PREFIX = UUID.fastUUID().toString(true);
private static final DefaultRedisScript
UNLOCK_SCRIPT; static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("lua/unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override public boolean tryLock(long time, TimeUnit unit) {
String threadId = THREAD_ID_PREFIX + "_" + Thread.currentThread().getId();
// 加锁时需要把自己的线程 id 存入, 防止别人调用了解锁
return Boolean.TRUE.equals(
stringRedisTemplate.opsForValue().setIfAbsent(
RedisConstants.LOCK_KEY + name,
threadId, unit.toMillis(time), TimeUnit.MILLISECONDS));
}
@Override public void unlock() {
String threadId = THREAD_ID_PREFIX + "_" + Thread.currentThread().getId();
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(RedisConstants.LOCK_KEY + name),
threadId);
}
}
```java
/**
* 使用 setnx 实现分布式锁
*/
@Transactional(rollbackFor = Exception.class)
public Result createOrderWithSetNx(Long goodsId, int stock) {
Long userId = UserHolder.getUser().getId();
int orderOpRes;
// 锁粒度是 goodsId + userId
String name = "order:create:" + goodsId + "_" + userId;
RedisLock lock = new RedisLock(name, stringRedisTemplate);
// 1. 判断是否获取到锁, 每个人只能购买一次
if (!lock.tryLock(30, TimeUnit.SECONDS)) {
return Result.fail("活动太火爆, 请稍等");
}
try {
// 2. 校验一人一单
Example example = new Example(Order.class);
example.createCriteria().andEqualTo("userId", userId).andEqualTo("goodsId", goodsId);
List<Order> orderList = orderMapper.selectByExample(example);
if (orderList.size() > 0) {
return Result.fail("超出购买限制");
}
// 3. 扣减库存
int stockOpRes = goodsService.updateStock(goodsId, stock);
if (stockOpRes <= 0) {
return Result.fail("扣减库存失败");
}
// 4. 生成订单
Order order = new Order();
order.setId(redisIdGenerate.nextId("order"));
order.setUserId(userId);
order.setGoodsId(goodsId);
orderOpRes = orderMapper.insertSelective(order);
if (orderOpRes <= 0) {
return Result.fail("订单保存失败");
}
} finally {
// 5. 释放锁
lock.unlock();
}
return Result.ok("下单成功");
}
3.3.2 基于 Redisson 实现的分布式锁
基于 setnx
实现的分布式锁存在下面的问题:
- 不可重入:同一个线程无法多次获取同一把锁
- 不可重试:获取锁只尝试一次就返回 false,没有重试机制
- 超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患
主从一致:如果 Redis 提供了主从集群,主从同步存在延迟,当主宕机时,如果从节点还未同步主节点中的锁数据,则会出现锁重复
/**
* 使用 redisson 实现
*/
@Transactional(rollbackFor = Exception.class)
public Result createOrderWithRedisson(Long goodsId) {
Long userId = UserHolder.getUser().getId();
int orderOpRes;
// 锁粒度是 goodsId + userId
String name = "order:create:" + goodsId + "_" + userId;
// 1. 判断是否获取到锁, 每个人只能购买一次
RLock lock = redissonClient.getLock(name);
try {
// 参数1: 获取锁的等待时间, 不为 -1 则会触发重试机制, 这里最多等待 10s(如果不传则默认值为-1, 不能触发重试机制)
// 参数2: 默认释放锁的超时时间, 上锁之后 30s 会自动解锁(如果不传则默认值为-1, 会触发看门狗机制)
// 参数3: 时间单位
if (!lock.tryLock(10, 30, TimeUnit.SECONDS)) {
return Result.fail("活动太火爆, 请稍等");
}
} catch (InterruptedException e) {
log.error("获取锁失败", e);
}
try {
// 2. 校验一人一单
Example example = new Example(Order.class);
example.createCriteria().andEqualTo("userId", userId).andEqualTo("goodsId", goodsId);
List<Order> orderList = orderMapper.selectByExample(example);
if (orderList.size() > 0) {
return Result.fail("超出购买限制");
}
// 3. 扣减库存
int stockOpRes = goodsService.updateStockGt0(goodsId);
if (stockOpRes <= 0) {
return Result.fail("扣减库存失败");
}
// 4. 生成订单
Order order = new Order();
order.setId(redisIdGenerate.nextId("order"));
order.setUserId(userId);
order.setGoodsId(goodsId);
orderOpRes = orderMapper.insertSelective(order);
if (orderOpRes <= 0) {
return Result.fail("订单保存失败");
}
} finally {
// 5. 释放锁
lock.unlock();
}
return Result.ok("下单成功");
}
3.4 Redisson
Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务。其中包括(
BitSet
,Set
,Multimap
,SortedSet
,Map
,List
,Queue
,BlockingQueue
,Deque
,BlockingDeque
,Semaphore
,Lock
,AtomicLong
,CountDownLatch
,Publish / Subscribe
,Bloom filter
,Remote service
,Spring cache
,Executor service
,Live Object service
,Scheduler service
) Redisson 提供了使用 Redis 的最简单和最便捷的方法。Redisson 的宗旨是促进使用者对Redis 的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。3.4.1 可重入锁
可重入:利用 hash 结构记录线程 id 和重入次数
可重试:利用信号量和 PubSub 功能实现等待、唤醒,获取锁失败的重试机制(waitTime 必须要设置值)
超时续约:利用 watchDog 每隔一段时间(leaseTime / 3
),重置超时时间(leaseTime 必须要为 -1 才能触发看门狗机制)-- 判断锁是否存在
if (redis.call('exists', KEYS[1]) == 0) then
-- 如果不存在就添加锁并设置当前的线程标示
redis.call('hincrby', KEYS[1], ARGV[2], 1);
-- 设置锁有效时间
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end ;
-- 如果锁存在且线程标示是自己
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
-- 锁次数 + 1,设置锁的有效期
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end ;
-- 如果锁存在且线程标示不是自己,则获取锁失败,返回需要等待释放锁的时间
return redis.call('pttl', KEYS[1]);
-- 判断锁的线程标示是否是自己
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
-- 如果不是自己则直接返回
return nil;
end ;
-- 如果锁线程标示是自己,锁次数 -1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
-- 如果此时锁次数 >0
if (counter > 0) then
-- 重置锁有效期
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
-- 如果锁次数为 0,则需要释放锁
redis.call('del', KEYS[1]);
-- 通过 publish 命令发布事件,通知锁已经释放
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end ;
return nil;
-- 判断锁的线程标示是否是自己
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
-- 续约超时时间
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
end ;
return 0;
3.4.2 联锁和红锁
为了解决 Redis 主从集群宕带来锁失效的问题,可以使用联锁和红锁解决,即创建多个独立的 Redis 节点,分别对每个实例进行加锁
联锁:N 个实例都必须获取到锁,有一个失败,即为失败
- 红锁:N 个实例至少有
N/2 + 1
个实例获取到锁才算成功 ```java RLock lock1 = redissonInstance1.getLock(“lock1”); RLock lock2 = redissonInstance2.getLock(“lock2”); RLock lock3 = redissonInstance3.getLock(“lock3”);
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3); // 同时加锁:lock1 lock2 lock3 // 所有的锁都上锁成功才算成功。 lock.lock(); … lock.unlock();
```java
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();
3.5 异步秒杀
- 新增秒杀商品的同时,将商品信息保存到 Redis 中
- 基于Lua 脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
- 如果抢购成功,将 商品 id 和用户 id 封装后存入阻塞队列
- 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
3.5.1 基于 List 结构模拟消息队列
Redis 的 List 数据结构是一个双向链表,很容易模拟出队列效果。队列是入口和出口不在一边,因此我们可以利用:LPUSH
结合RPOP
、或者RPUSH
结合LPOP
来实现。不过要注意的是,当队列中没有消息时RPOP
或LPOP
操作会返回 null,并不像 JVM 的阻塞队列那样会阻塞并等待消息
因此这里应该使用BRPOP
或者BLPOP
来实现阻塞效果
基于List的消息队列有哪些优缺点
- 优点
- 利用 Redis 存储,不受限于 JVM 内存上限
- 基于 Redis 的持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点
- 无法避免消息丢失
只支持单消费者 ```java @Slf4j @Service public class OrderAsyncServiceImpl implements OrderService {
@Resource private OrderMapper orderMapper;
@Resource private GoodsService goodsService;
@Resource private StringRedisTemplate stringRedisTemplate;
@Resource private RedisIdGenerate redisIdGenerate;
private static final DefaultRedisScript
SEC_KILL_SCRIPT; static { SEC_KILL_SCRIPT = new DefaultRedisScript<>(); SEC_KILL_SCRIPT.setLocation(new ClassPathResource(“lua/seckill.lua”)); SEC_KILL_SCRIPT.setResultType(Long.class); }
@Override public Result create(Long goodsId) { // 1. 校验优惠券开始时间和结束时间 Goods goods = goodsService.getById(goodsId); if (goods == null) {
return Result.fail("优惠券不存在");
} LocalDateTime now = LocalDateTime.now(); if (goods.getBeginTime().isAfter(now)) {
return Result.fail("秒杀尚未开始");
} if (goods.getEndTime().isBefore(now)) {
return Result.fail("秒杀已经结束");
} // 2. 创建订单 return this.createAsyncOrder(goods); }
/**
- 异步下单
*/
private Result createAsyncOrder(Goods goods) {
Long userId = UserHolder.getUser().getId();
Long goodsId = goods.getId();
long orderId = redisIdGenerate.nextId(“order”);
Long res = stringRedisTemplate.execute(SEC_KILL_SCRIPT, Collections.emptyList(),
if (res == null) {String.valueOf(goodsId), String.valueOf(userId), String.valueOf(orderId));
} int result = res.intValue(); if (result == 1) {return Result.fail("下单失败");
} if (result == 2) {return Result.fail("商品库存不足");
} return Result.ok(“下单成功”); }return Result.fail("超出购买限制");
@PostConstruct public void dealTopicOrder() { Executors.newSingleThreadExecutor(r -> new Thread(r, “deal-topic-order”))
.execute(() -> {
while (true) {
// 从 topic:order 中获取订单数据, 完成下单操作
String result = stringRedisTemplate.opsForList().rightPop("topic:order", 3, TimeUnit.SECONDS);
if (StringUtil.isBlank(result)) {
continue;
}
TopicOrderData topicOrderData = JSONUtil.toBean(result, TopicOrderData.class);
Long userId = topicOrderData.getUserId();
Long goodsId = topicOrderData.getGoodsId();
// 1. 校验一人一单
Example example = new Example(Order.class);
example.createCriteria().andEqualTo("userId", userId).andEqualTo("goodsId", goodsId);
List<Order> orderList = orderMapper.selectByExample(example);
if (orderList.size() > 0) {
log.error("超出购买限制");
return;
}
// 2. 扣减库存
int stockOpRes = goodsService.updateStockGt0(goodsId);
if (stockOpRes <= 0) {
log.error("商品库存扣减失败");
return;
}
// 3. 生成订单
Order order = new Order();
order.setId(redisIdGenerate.nextId("order"));
order.setUserId(userId);
order.setGoodsId(goodsId);
int orderOpRes = orderMapper.insertSelective(order);
if (orderOpRes <= 0) {
log.error("订单生成失败");
return;
}
log.info("异步创单完成 {}", order);
}
});
}
}
<a name="qsAN8"></a>
### 3.5.2 基于 Pub、Sub 的消息队列
PubSub(发布订阅)是 Redis2.0 版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个 channel,生产者向对应 channel 发送消息后,所有订阅者都能收到相关消息
- SUBSCRIBE channel [channel] :订阅一个或多个频道
- PUBLISH channel msg :向一个频道发送消息
- PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
**基于 PubSub 的消息队列有哪些优缺点**
- 优点
- 采用发布订阅模型,支持多生产、多消费
- 缺点:
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
<a name="Odd1E"></a>
### 3.5.3 基于 Stream 的消息队列
Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列
**STREAM 类型消息队列的 XREADGROUP 命令特点**
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
<a name="LdvMk"></a>
### 3.5.4 Redis 实现消息队列总结
| | List | PubSub | Stream |
| --- | --- | --- | --- |
| 消息持久化 | 支持 | 不支持 | 支持 |
| 阻塞读取 | 支持 | 支持 | 支持 |
| 消息堆积处理 | 受限于内存空间,可以利用多消费者加快处理 | 受限于消费者缓冲区 | 受限于队列长度,可以利用消费者组提高消费速度,减少堆积 |
| 消息确认机制 | 不支持 | 不支持 | 支持 |
| 消息回溯 | 不支持 | 不支持 | 支持 |
<a name="uCzoO"></a>
# 4. 其它功能
<a name="j4Wo3"></a>
## 4.1 点赞排行榜
按照点赞时间先后排序,返回 Top3 的用户信息
| | List | Set | SortedSet |
| --- | --- | --- | --- |
| 排序方式 | 按添加顺序排序 | 无法排序 | 根据 score 值排序 |
| 唯一性 | 不唯一 | 唯一 | 不唯一 |
| 查找方式 | 按索引查找或首尾查找 | 根据元素查找 | 根据元素查找 |
```java
@Service
public class BlogServiceImpl implements BlogService {
@Resource
private BlogMapper blogMapper;
@Resource
private UserService userService;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result create(Blog blog) {
// 创建博客
return Result.ok(blogMapper.insertSelective(blog));
}
@Override
public Result like(Long blogId) {
// 1. 获取登录用户 id
User user = UserHolder.getUser();
if (user == null) {
// 说明当前用户未登录
return Result.ok();
}
Long userId = user.getId();
// 2. 判断当前用户是否已经点过赞
String key = "blog:like:" + blogId;
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
if (score == null) {
// 如果未点赞, 则可以点赞
int likedRes = blogMapper.liked(blogId);
if (likedRes == 1) {
// 添加到 sortedSet 集合中
stringRedisTemplate.opsForZSet().add(key, userId.toString(), System.currentTimeMillis());
}
} else {
// 如果已经点过赞, 则取消赞
int unlikedRes = blogMapper.unliked(blogId);
if (unlikedRes == 1) {
// 从 sortedSet 集合中移除
stringRedisTemplate.opsForZSet().remove(key, userId.toString());
}
}
return Result.ok();
}
public Result list(Long blogId) {
// 1. 查询最早点赞 top3
String key = "blog:like:" + blogId;
Set<String> top3 = stringRedisTemplate.opsForZSet().range(key, 0, 2);
if (CollectionUtil.isEmpty(top3)) {
return Result.ok();
}
// 2. 解析出用户 id
List<Long> userIds = top3.stream().map(Long::valueOf).collect(Collectors.toList());
List<User> userList = userService.findByIds(userIds);
return Result.ok(userList);
}
}
4.2 用户签到
4.2.1 BitMap 用法
把每一个 bit 位对应当月的每一天,形成了映射关系。用 0 和 1 标示业务状态,这种思路就称为位图(BitMap)
Redis 中是利用 string 类型数据结构实现 BitMap,因此最大上限是 512M,转换为 bit 则是 232 个 bit 位
SETBIT:向指定位置(offset)存入一个 0 或 1
GETBIT :获取指定位置(offset)的 bit 值
BITCOUNT :统计 BitMap 中值为 1 的 bit 位的数量
BITFIELD :操作(查询、修改、自增)BitMap 中 bit 数组中的指定位置(offset)的值
BITFIELD_RO :获取 BitMap 中 bit 数组,并以十进制形式返回
BITOP :将多个 BitMap 的结果做位运算(与 、或、异或)
BITPOS :查找 bit 数组中指定范围内第一个 0 或 1 出现的位置
4.2.2 签到功能
@Override
public Result sign() {
// 1. 获取当前登录用户
Long userId = UserHolder.getUser().getId();
// 2. 获取当前日期
LocalDateTime now = LocalDateTime.now();
// 3. 拼接 key
String key = RedisConstants.USER_SIGN_KEY + now.format(DateTimeFormatter.ofPattern("yyyyMM")) + ":" + userId;
// 4. 获取今天是本月的第几天, 下标是从 1 开始
int dayOfMonth = now.getDayOfMonth();
Boolean signFlag = stringRedisTemplate.opsForValue().getBit(key, dayOfMonth - 1);
if (Boolean.TRUE.equals(signFlag)) {
return Result.ok("当天已经签到");
}
// 5. 向 BitMap 中设置值表示完成签到
stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1, true);
return Result.ok("签到成功");
}
4.2.3 签到统计
连续签到天数:从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数,就是连续签到天数
如何得到本月到今天为止的所有签到数据:BITFIELD key GET u[dayOfMonth] 0
如何从后向前遍历每个 bit 位:与 1 做与运算,就能得到最后一个 bit 位。随后右移 1 位,下一个 bit 位就成为了最后一个 bit 位
@Override
public Result signCount() {
// 1. 获取当前登录用户
Long userId = UserHolder.getUser().getId();
// 2. 获取当前日期
LocalDateTime now = LocalDateTime.now();
// 3. 拼接 key
String key = RedisConstants.USER_SIGN_KEY + now.format(DateTimeFormatter.ofPattern("yyyyMM")) + ":" + userId;
// 4. 获取今天是本月的第几天, 下标是从 1 开始
int dayOfMonth = now.getDayOfMonth();
// 5.获取本月截止今天为止的所有的签到记录
List<Long> result = stringRedisTemplate.opsForValue().bitField(
key,
BitFieldSubCommands.create()
.get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)
);
if (CollectionUtil.isEmpty(result) || result.get(0) == null) {
// 没有任何签到结果
return Result.ok(0);
}
Long signNum = result.get(0);
// 6. 循环遍历
int count = 0;
while (true) {
// 与 1 做与运算,得到数字的最后一个 bit 位, 如果这个 bit 为 0, 则表示签到中断
if ((signNum & 1) == 0) {
break;
} else {
// 不为 0 表示签到
count++;
}
// 把数字右移一位,抛弃最后一个 bit 位,继续下一个 bit 位
signNum = signNum >> 1;
}
return Result.ok(count);
}