什么是分布式锁
单体应用锁:在一个JVM进程内有效,无法跨JVM、跨进程
分布式锁:可以跨越多个JVM、跨越多个进程的锁
分布式锁的设计思路
由于Tomcat是由Java启动的,所以每个Tomcat可以看成一个JVM,JVM内部的锁是无法跨越多个进程的。所以,我们要实现分布式锁,我们只能在这些JVM之外去寻找,通过其他的组件来实现分布式锁。系统的架构如图所示:
两个Tomcat通过第三方的组件实现跨JVM、跨进程的分布式锁。这就是分布式锁的解决思路,找到所有JVM可以共同访问的第三方组件,通过第三方组件实现分布式锁。
分布式锁方案
分布式锁都是通过第三方组件来实现的,目前比较流行的分布式锁的解决方案有:
- 数据库,通过数据库可以实现分布式锁,但是在高并发的情况下对数据库压力较大,所以很少使用。
- Redis,借助Redis也可以实现分布式锁,而且Redis的Java客户端种类很多,使用的方法也不尽相同。
- Zookeeper,Zookeeper也可以实现分布式锁,同样Zookeeper也存在多个Java客户端,使用方法也不相同。
超卖
什么是超卖?
- 商品卖出数量超过库存数量
超卖现象一
系统中库存1,但是产生两笔订单
商品存库1,A和B同时看到商品,加入购物车,同时提交订单
产生原因:
- 扣减库存的动作,在程序中进行,在程序中计算剩余库存
- 扣减库存的动作,在程序中进行,在程序中计算剩余库存
解决方法:
- 扣减库存不在程序中进行,而是通过数据库
- 向数据库传递库存增量,扣减一个库存,增量为-1
- 在数据库update语句计算库存,通过update行锁解决并发
超卖现象二
系统中库存变为-1
卖家不知所措,询问平台客服
产生原因:
并发检验库存,造成库存充足的假象
update更新库存,导致库存为负数
解决方法:
- 校验库存、扣减库存统一加锁
- 使之成为原子性的操作
- 并发时,只有获得锁的线程才能校验、扣减库存
- 扣减库存后,释放锁
- 确保库存不会扣成负数
基于Synchronized锁解决超卖问题(最原始的锁)
基于ReentrantLock锁解决并发超卖问题(并发包中的锁)
基于数据库悲观锁的分布式锁
多个进程、多个线程访问共同组件数据库
通过select…….for update访问一条数据
for update锁定数据,其他线程只能等待
<select id="selectDistributeLock" resultType="com.yy.distributelock.model.DistributeLock">
select * from distribute_lock
where business_code = #{businessCode,jdbcType=VARCHAR}
for update
</select>
@RequestMapping("dbLock")
@Transactional(rollbackFor = Exception.class)
public String dbLock() throws Exception {
log.info("我进入了方法!");
DistributeLock distributeLock = distributeLockMapper.selectDistributeLock("demo");
if (distributeLock==null) throw new Exception("分布式锁找不到");
log.info("我进入了锁!");
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "我已经执行完成!";
}
基于数据库设计分布式锁优缺点
- 优点:简单方便、易于理解、易于操作
- 缺点:并发量大时,对数据库压力较大
- 建议:作为锁的数据库与业务数据库分开
基于Redis的Setnx实现分布式锁
实现原理
获取锁的Redis命令
SET key my_random_value NX PX 30000
- key:资源名称,可根据不同的业务区分不同的锁
- my_random_value:随机值,每个线程的随机值都不同,用于释放锁时的校验
- NX:key不存在时设置成功,key存在时设置不成功
- PX:自动失效的时间,出现异常情况,锁可以过期失效
利用NX的原子性,多个线程并发时,只有一个线程可以设置成功(因为Redis单线程)
设置成功即获得锁,可以执行后续的业务处理
如果出现异常,过了锁的有效期,锁自动释放
释放锁采用Redis的delete命令
锁释放时校验之前设置的随机数,相同才能释放
释放锁的LUA脚本
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
- 代码实现
@RequestMapping("redisLock")
public String redisLock(){
log.info("我进入了方法!");
try (RedisLock redisLock = new RedisLock(redisTemplate,"redisKey",30)){
if (redisLock.getLock()) {
log.info("我进入了锁!!");
Thread.sleep(15000);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
log.info("方法执行完成");
return "方法执行完成";
}
@Slf4j
public class RedisLock implements AutoCloseable {
private RedisTemplate redisTemplate;
private String key;
private String value;
//单位:秒
private int expireTime;
public RedisLock(RedisTemplate redisTemplate,String key,int expireTime){
this.redisTemplate = redisTemplate;
this.key = key;
this.expireTime=expireTime;
this.value = UUID.randomUUID().toString();
}
/**
* 获取分布式锁
* @return
*/
public boolean getLock(){
RedisCallback<Boolean> redisCallback = connection -> {
//设置NX
RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent();
//设置过期时间
Expiration expiration = Expiration.seconds(expireTime);
//序列化key
byte[] redisKey = redisTemplate.getKeySerializer().serialize(key);
//序列化value
byte[] redisValue = redisTemplate.getValueSerializer().serialize(value);
//执行setnx操作
Boolean result = connection.set(redisKey, redisValue, expiration, setOption);
return result;
};
//获取分布式锁
Boolean lock = (Boolean)redisTemplate.execute(redisCallback);
return lock;
}
/**
* 释放分布式锁
*/
public boolean unLock() {
String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" +
" return redis.call(\"del\",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
RedisScript<Boolean> redisScript = RedisScript.of(script,Boolean.class);
List<String> keys = Arrays.asList(key);
Boolean result = (Boolean)redisTemplate.execute(redisScript, keys, value);
log.info("释放锁的结果:"+result);
return result;
}
@Override
public void close() throws Exception {
unLock();
}
}
基于Zookeeper的瞬时节点实现分布式锁
Zookeeper的数据结构
Zookeeper的下载安装
Zookeeper的观察器
实现原理
利用Zookeeper的瞬时有序节点的特性
多线程并发创建瞬时节点时,得到有序的序列
序号最小的线程获得锁
其他的线程则监听自己序号的前一个序号
前一个线程执行完成,删除自己序号的节点
下一个序号的线程得到通知,继续执行
以此类推
创建节点时,已经确定了线程的执行顺序
代码实现
@Slf4j
public class ZkLock implements AutoCloseable, Watcher {
private ZooKeeper zooKeeper;
private String znode;
public ZkLock() throws IOException {
this.zooKeeper = new ZooKeeper("localhost:2181",
10000,this);
}
public boolean getLock(String businessCode) {
try {
//创建业务 根节点 持久节点
Stat stat = zooKeeper.exists("/" + businessCode, false);
if (stat==null){
zooKeeper.create("/" + businessCode,businessCode.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
//创建瞬时有序节点 /order/order_00000001
znode = zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
//获取业务节点下 所有的子节点
List<String> childrenNodes = zooKeeper.getChildren("/" + businessCode, false);
//子节点排序
Collections.sort(childrenNodes);
//获取序号最小的(第一个)子节点
String firstNode = childrenNodes.get(0);
//如果创建的节点是第一个子节点,则获得锁
if (znode.endsWith(firstNode)){
return true;
}
//不是第一个子节点,则监听前一个节点
String lastNode = firstNode;
for (String node:childrenNodes){
if (znode.endsWith(node)){
zooKeeper.exists("/"+businessCode+"/"+lastNode,true);
break;
}else {
lastNode = node;
}
}
synchronized (this){
wait();
}
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
@Override
public void close() throws Exception {
zooKeeper.delete(znode,-1);
zooKeeper.close();
log.info("我已经释放了锁!");
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted){
synchronized (this){
notify();
}
}
}
}
@RequestMapping("zkLock")
public String zookeeperLock(){
log.info("我进入了方法!");
try (ZkLock zkLock = new ZkLock()) {
if (zkLock.getLock("order")){
log.info("我获得了锁");
Thread.sleep(10000);
}
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
log.info("方法执行完成!");
return "方法执行完成!";
}
基于Zookeeper的Curator客户端实现分布式锁
- 引入curator客户端
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
</dependency>
curator已经实现了分布式锁的方法
直接调用即可
@SpringBootApplication
public class DistributeZkLockApplication {
public static void main(String[] args) {
SpringApplication.run(DistributeZkLockApplication.class, args);
}
@Bean(initMethod="start",destroyMethod = "close")
public CuratorFramework getCuratorFramework() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
return client;
}
}
@Autowired
private CuratorFramework client;
@RequestMapping("curatorLock")
public String curatorLock(){
log.info("我进入了方法!");
InterProcessMutex lock = new InterProcessMutex(client, "/order");
try{
if (lock.acquire(30, TimeUnit.SECONDS)){
log.info("我获得了锁!!");
Thread.sleep(10000);
}
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
log.info("我释放了锁!!");
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
log.info("方法执行完成!");
return "方法执行完成!";
}
基于Redis的Redisson客户端实现分布式锁
引入Redisson的jar包
进行Redisson与Redis的配置
使用分布式锁
通过JAVA API方式引入Redisson
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.11.2</version>
</dependency>
public void testRedissonLock() {
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.73.130:6379");
RedissonClient redisson = Redisson.create(config);
RLock rLock = redisson.getLock("order");
try {
rLock.lock(30, TimeUnit.SECONDS);
log.info("我获得了锁!!!");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
log.info("我释放了锁!!");
rLock.unlock();
}
}
- Spring项目引入Redisson
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.11.2</version>
</dependency>
@SpringBootApplication
@ImportResource("classpath*:redisson.xml")
public class RedissonLockApplication {
public static void main(String[] args) {
SpringApplication.run(RedissonLockApplication.class, args);
}
}
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:redisson="http://redisson.org/schema/redisson"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://redisson.org/schema/redisson
http://redisson.org/schema/redisson/redisson.xsd
">
<redisson:client>
<redisson:single-server address="redis://192.168.73.130:6379"/>
</redisson:client>
</beans>
- Spring Boot项目引入Redisson
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.11.2</version>
</dependency>
spring.redis.host=192.168.73.130
@Autowired
private RedissonClient redisson;
@RequestMapping("redissonLock")
public String redissonLock() {
RLock rLock = redisson.getLock("order");
log.info("我进入了方法!!");
try {
rLock.lock(30, TimeUnit.SECONDS);
log.info("我获得了锁!!!");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
log.info("我释放了锁!!");
rLock.unlock();
}
log.info("方法执行完成!!");
return "方法执行完成!!";
}
基于分布式锁解决定时任务重复问题
@Service
@Slf4j
public class SchedulerService {
@Autowired
private RedisTemplate redisTemplate;
@Scheduled(cron = "0/5 * * * * ?")
public void sendSms(){
try(RedisLock redisLock = new RedisLock(redisTemplate,"autoSms",30)) {
if (redisLock.getLock()){
log.info("向138xxxxxxxx发送短信!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
分布式锁实现方案对比
分布式锁技术落地-应用到天天吃货
- 引入Redisson pom依赖
<!-- 分布式锁【1】引入 redisson 依赖 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.12.0</version>
</dependency>
- 引入RedissonClient客户端依赖
//分布式锁【2】自动注入
@Autowired
private RedissonClient redissonClient;
加锁
/**
* 分布式锁【3】 编写业务代码
* 1、Redisson是基于Redis,使用Redisson之前,项目必须使用Redis
* 2、注意getLock方法中的参数,以specId作为参数,每个specId一个key,和
* 数据库中的行锁是一致的,不会是方法级别的锁
*/
RLock rLock = redissonClient.getLock("SPECID_" + specId);
try {
/**
* 1、获取分布式锁,锁的超时时间是5秒get
* 2、获取到了锁,进行后续的业务操作
*/
rLock.lock(5, TimeUnit.HOURS);
int result = itemsMapperCustom.decreaseItemSpecStock(specId, buyCounts);
if (result != 1) {
throw new RuntimeException("订单创建失败,原因:库存不足!");
}
} catch (Exception e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
/**
* 不管业务是否操作正确,随后都要释放掉分布式锁
* 如果不释放,过了超时时间也会自动释放
*/
rLock.unlock();
}