什么是分布式锁
单体应用锁:在一个JVM进程内有效,无法跨JVM、跨进程
分布式锁:可以跨越多个JVM、跨越多个进程的锁
分布式锁的设计思路
由于Tomcat是由Java启动的,所以每个Tomcat可以看成一个JVM,JVM内部的锁是无法跨越多个进程的。所以,我们要实现分布式锁,我们只能在这些JVM之外去寻找,通过其他的组件来实现分布式锁。系统的架构如图所示:

两个Tomcat通过第三方的组件实现跨JVM、跨进程的分布式锁。这就是分布式锁的解决思路,找到所有JVM可以共同访问的第三方组件,通过第三方组件实现分布式锁。
分布式锁方案
分布式锁都是通过第三方组件来实现的,目前比较流行的分布式锁的解决方案有:
- 数据库,通过数据库可以实现分布式锁,但是在高并发的情况下对数据库压力较大,所以很少使用。
- Redis,借助Redis也可以实现分布式锁,而且Redis的Java客户端种类很多,使用的方法也不尽相同。
- Zookeeper,Zookeeper也可以实现分布式锁,同样Zookeeper也存在多个Java客户端,使用方法也不相同。
超卖
什么是超卖?
- 商品卖出数量超过库存数量
超卖现象一
系统中库存1,但是产生两笔订单
商品存库1,A和B同时看到商品,加入购物车,同时提交订单
产生原因:
- 扣减库存的动作,在程序中进行,在程序中计算剩余库存

- 扣减库存的动作,在程序中进行,在程序中计算剩余库存
解决方法:
- 扣减库存不在程序中进行,而是通过数据库
- 向数据库传递库存增量,扣减一个库存,增量为-1
- 在数据库update语句计算库存,通过update行锁解决并发
超卖现象二
系统中库存变为-1
卖家不知所措,询问平台客服
产生原因:
并发检验库存,造成库存充足的假象
update更新库存,导致库存为负数

解决方法:
- 校验库存、扣减库存统一加锁
- 使之成为原子性的操作
- 并发时,只有获得锁的线程才能校验、扣减库存
- 扣减库存后,释放锁
- 确保库存不会扣成负数
基于Synchronized锁解决超卖问题(最原始的锁)
基于ReentrantLock锁解决并发超卖问题(并发包中的锁)
基于数据库悲观锁的分布式锁
多个进程、多个线程访问共同组件数据库
通过select…….for update访问一条数据
for update锁定数据,其他线程只能等待
<select id="selectDistributeLock" resultType="com.yy.distributelock.model.DistributeLock">select * from distribute_lockwhere business_code = #{businessCode,jdbcType=VARCHAR}for update</select>
@RequestMapping("dbLock")@Transactional(rollbackFor = Exception.class)public String dbLock() throws Exception {log.info("我进入了方法!");DistributeLock distributeLock = distributeLockMapper.selectDistributeLock("demo");if (distributeLock==null) throw new Exception("分布式锁找不到");log.info("我进入了锁!");try {Thread.sleep(20000);} catch (InterruptedException e) {e.printStackTrace();}return "我已经执行完成!";}
基于数据库设计分布式锁优缺点
- 优点:简单方便、易于理解、易于操作
- 缺点:并发量大时,对数据库压力较大
- 建议:作为锁的数据库与业务数据库分开
基于Redis的Setnx实现分布式锁
实现原理
获取锁的Redis命令
SET key my_random_value NX PX 30000
- key:资源名称,可根据不同的业务区分不同的锁
- my_random_value:随机值,每个线程的随机值都不同,用于释放锁时的校验
- NX:key不存在时设置成功,key存在时设置不成功
- PX:自动失效的时间,出现异常情况,锁可以过期失效
利用NX的原子性,多个线程并发时,只有一个线程可以设置成功(因为Redis单线程)
设置成功即获得锁,可以执行后续的业务处理
如果出现异常,过了锁的有效期,锁自动释放
释放锁采用Redis的delete命令
锁释放时校验之前设置的随机数,相同才能释放
释放锁的LUA脚本
if redis.call("get",KEYS[1]) == ARGV[1] thenreturn redis.call("del",KEYS[1])elsereturn 0end
- 代码实现
@RequestMapping("redisLock")public String redisLock(){log.info("我进入了方法!");try (RedisLock redisLock = new RedisLock(redisTemplate,"redisKey",30)){if (redisLock.getLock()) {log.info("我进入了锁!!");Thread.sleep(15000);}} catch (InterruptedException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();}log.info("方法执行完成");return "方法执行完成";}
@Slf4jpublic class RedisLock implements AutoCloseable {private RedisTemplate redisTemplate;private String key;private String value;//单位:秒private int expireTime;public RedisLock(RedisTemplate redisTemplate,String key,int expireTime){this.redisTemplate = redisTemplate;this.key = key;this.expireTime=expireTime;this.value = UUID.randomUUID().toString();}/*** 获取分布式锁* @return*/public boolean getLock(){RedisCallback<Boolean> redisCallback = connection -> {//设置NXRedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent();//设置过期时间Expiration expiration = Expiration.seconds(expireTime);//序列化keybyte[] redisKey = redisTemplate.getKeySerializer().serialize(key);//序列化valuebyte[] redisValue = redisTemplate.getValueSerializer().serialize(value);//执行setnx操作Boolean result = connection.set(redisKey, redisValue, expiration, setOption);return result;};//获取分布式锁Boolean lock = (Boolean)redisTemplate.execute(redisCallback);return lock;}/*** 释放分布式锁*/public boolean unLock() {String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" +" return redis.call(\"del\",KEYS[1])\n" +"else\n" +" return 0\n" +"end";RedisScript<Boolean> redisScript = RedisScript.of(script,Boolean.class);List<String> keys = Arrays.asList(key);Boolean result = (Boolean)redisTemplate.execute(redisScript, keys, value);log.info("释放锁的结果:"+result);return result;}@Overridepublic void close() throws Exception {unLock();}}
基于Zookeeper的瞬时节点实现分布式锁
Zookeeper的数据结构

Zookeeper的下载安装
Zookeeper的观察器

实现原理
利用Zookeeper的瞬时有序节点的特性
多线程并发创建瞬时节点时,得到有序的序列
序号最小的线程获得锁
其他的线程则监听自己序号的前一个序号
前一个线程执行完成,删除自己序号的节点
下一个序号的线程得到通知,继续执行
以此类推
创建节点时,已经确定了线程的执行顺序

代码实现
@Slf4jpublic class ZkLock implements AutoCloseable, Watcher {private ZooKeeper zooKeeper;private String znode;public ZkLock() throws IOException {this.zooKeeper = new ZooKeeper("localhost:2181",10000,this);}public boolean getLock(String businessCode) {try {//创建业务 根节点 持久节点Stat stat = zooKeeper.exists("/" + businessCode, false);if (stat==null){zooKeeper.create("/" + businessCode,businessCode.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}//创建瞬时有序节点 /order/order_00000001znode = zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);//获取业务节点下 所有的子节点List<String> childrenNodes = zooKeeper.getChildren("/" + businessCode, false);//子节点排序Collections.sort(childrenNodes);//获取序号最小的(第一个)子节点String firstNode = childrenNodes.get(0);//如果创建的节点是第一个子节点,则获得锁if (znode.endsWith(firstNode)){return true;}//不是第一个子节点,则监听前一个节点String lastNode = firstNode;for (String node:childrenNodes){if (znode.endsWith(node)){zooKeeper.exists("/"+businessCode+"/"+lastNode,true);break;}else {lastNode = node;}}synchronized (this){wait();}return true;} catch (Exception e) {e.printStackTrace();}return false;}@Overridepublic void close() throws Exception {zooKeeper.delete(znode,-1);zooKeeper.close();log.info("我已经释放了锁!");}@Overridepublic void process(WatchedEvent event) {if (event.getType() == Event.EventType.NodeDeleted){synchronized (this){notify();}}}}
@RequestMapping("zkLock")public String zookeeperLock(){log.info("我进入了方法!");try (ZkLock zkLock = new ZkLock()) {if (zkLock.getLock("order")){log.info("我获得了锁");Thread.sleep(10000);}} catch (IOException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();}log.info("方法执行完成!");return "方法执行完成!";}
基于Zookeeper的Curator客户端实现分布式锁
- 引入curator客户端
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.2.0</version></dependency>
curator已经实现了分布式锁的方法
直接调用即可
@SpringBootApplicationpublic class DistributeZkLockApplication {public static void main(String[] args) {SpringApplication.run(DistributeZkLockApplication.class, args);}@Bean(initMethod="start",destroyMethod = "close")public CuratorFramework getCuratorFramework() {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);return client;}}
@Autowiredprivate CuratorFramework client;@RequestMapping("curatorLock")public String curatorLock(){log.info("我进入了方法!");InterProcessMutex lock = new InterProcessMutex(client, "/order");try{if (lock.acquire(30, TimeUnit.SECONDS)){log.info("我获得了锁!!");Thread.sleep(10000);}} catch (IOException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();}finally {try {log.info("我释放了锁!!");lock.release();} catch (Exception e) {e.printStackTrace();}}log.info("方法执行完成!");return "方法执行完成!";}
基于Redis的Redisson客户端实现分布式锁
引入Redisson的jar包
进行Redisson与Redis的配置
使用分布式锁
通过JAVA API方式引入Redisson
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.11.2</version></dependency>
public void testRedissonLock() {Config config = new Config();config.useSingleServer().setAddress("redis://192.168.73.130:6379");RedissonClient redisson = Redisson.create(config);RLock rLock = redisson.getLock("order");try {rLock.lock(30, TimeUnit.SECONDS);log.info("我获得了锁!!!");Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}finally {log.info("我释放了锁!!");rLock.unlock();}}
- Spring项目引入Redisson
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.11.2</version></dependency>
@SpringBootApplication@ImportResource("classpath*:redisson.xml")public class RedissonLockApplication {public static void main(String[] args) {SpringApplication.run(RedissonLockApplication.class, args);}}
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:redisson="http://redisson.org/schema/redisson"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context.xsdhttp://redisson.org/schema/redissonhttp://redisson.org/schema/redisson/redisson.xsd"><redisson:client><redisson:single-server address="redis://192.168.73.130:6379"/></redisson:client></beans>
- Spring Boot项目引入Redisson
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.11.2</version></dependency>
spring.redis.host=192.168.73.130
@Autowiredprivate RedissonClient redisson;@RequestMapping("redissonLock")public String redissonLock() {RLock rLock = redisson.getLock("order");log.info("我进入了方法!!");try {rLock.lock(30, TimeUnit.SECONDS);log.info("我获得了锁!!!");Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}finally {log.info("我释放了锁!!");rLock.unlock();}log.info("方法执行完成!!");return "方法执行完成!!";}
基于分布式锁解决定时任务重复问题
@Service@Slf4jpublic class SchedulerService {@Autowiredprivate RedisTemplate redisTemplate;@Scheduled(cron = "0/5 * * * * ?")public void sendSms(){try(RedisLock redisLock = new RedisLock(redisTemplate,"autoSms",30)) {if (redisLock.getLock()){log.info("向138xxxxxxxx发送短信!");}} catch (Exception e) {e.printStackTrace();}}}
分布式锁实现方案对比


分布式锁技术落地-应用到天天吃货
- 引入Redisson pom依赖
<!-- 分布式锁【1】引入 redisson 依赖 --><dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.12.0</version></dependency>
- 引入RedissonClient客户端依赖
//分布式锁【2】自动注入@Autowiredprivate RedissonClient redissonClient;
加锁
/*** 分布式锁【3】 编写业务代码* 1、Redisson是基于Redis,使用Redisson之前,项目必须使用Redis* 2、注意getLock方法中的参数,以specId作为参数,每个specId一个key,和* 数据库中的行锁是一致的,不会是方法级别的锁*/RLock rLock = redissonClient.getLock("SPECID_" + specId);try {/*** 1、获取分布式锁,锁的超时时间是5秒get* 2、获取到了锁,进行后续的业务操作*/rLock.lock(5, TimeUnit.HOURS);int result = itemsMapperCustom.decreaseItemSpecStock(specId, buyCounts);if (result != 1) {throw new RuntimeException("订单创建失败,原因:库存不足!");}} catch (Exception e) {log.error(e.getMessage(), e);throw new RuntimeException(e.getMessage(), e);} finally {/*** 不管业务是否操作正确,随后都要释放掉分布式锁* 如果不释放,过了超时时间也会自动释放*/rLock.unlock();}
