前言

分布式锁要解决两个问题:
1、锁竞争
2、死锁
以redis为例,redis提供了setnx来保证原子写入,只有一个客户端能写入成功,也就能成功获得锁。同时为了防止客户端异常导致锁没有及时释放,可以对这个锁设置过期s时间,命令如下:

  1. SET lock_name my_random_value NX PX 30000

除了锁自动过期以外,还需要能手动释放锁,命令如下:

  1. del lock_name

etcd的实现方式

etcd提供了以下几种特性来实现分布式锁:

  • Lease机制租约机制(TTL,Time To Live),etcd 可以为存储的 key-value 对设置租约,当租约到期,key-value 将失效删除;同时也支持续约,通过客户端可以在租约到期之前续约,以避免 key-value 对过期失效。Lease机制可以保证分布式锁的安全性,为锁对应的 key 配置租约,即使锁的持有者因故障而不能主动释放锁,锁也会因租约到期而自动释放。
  • Revision机制每个 key 带有一个 Revision 号,每进行一次事务便+1,它是全局唯一的,通过 Revision 的大小就可以知道进行写操作的顺序。在实现分布式锁时,多个客户端同时抢锁,根据 Revision 号大小依次获得锁,可以避免 “羊群效应” ,实现公平锁。这和zookeeper的临时顺序节点+监听机制可避免羊群效应的原理是一致的。
  • Prefix机制即前缀机制。例如,一个名为 /etcd/lock 的锁,两个争抢它的客户端进行写操作,实际写入的 key 分别为:key1=”/etcd/lock/UUID1”,key2=”/etcd/lock/UUID2”。其中,UUID 表示全局唯一的 ID,确保两个 key 的唯一性。写操作都会成功,但返回的 Revision 不一样,那么,如何判断谁获得了锁呢?通过前缀 /etcd/lock 查询,返回包含两个 key-value 对的的 KeyValue 列表,同时也包含它们的 Revision,通过 Revision 大小,客户端可以判断自己是否获得锁 。
  • Watch机制即监听机制。Watch 机制支持 Watch 某个固定的 key,也支持 Watch 一个范围(前缀机制)。当被 Watch 的 key 或范围发生变化,客户端将收到通知;在实现分布式锁时,如果抢锁失败,可通过 Prefix 机制返回的 Key-Value 列表获得 Revision 比自己小且相差最小的 key(称为 pre-key),对 pre-key 进行监听,因为只有它释放锁,自己才能获得锁,如果 Watch 到 pre-key 的 DELETE 事件,则说明 pre-key 已经释放,自己将持有锁。

实现流程如下:

  1. 建立连接客户端连接 etcd,以 /etcd/lock 为前缀创建全局唯一的 key,假设第一个客户端对应的 key=”/etcd/lock/UUID1”,第二个为 key=”/etcd/lock/UUID2”;客户端分别为自己的 key 创建租约 - Lease,租约的长度根据业务耗时确定。
  2. 创建定时任务作为租约的“心跳”
    当一个客户端持有锁期间,其它客户端只能等待,为了避免等待期间租约失效,客户端需创建一个定时任务作为“心跳”进行续约。此外,如果持有锁期间客户端崩溃,心跳停止,key 将因租约到期而被删除,从而锁释放,避免死锁。
  3. 客户端将自己全局唯一的 key 写入 etcd
    执行 put 操作,将步骤 1 中创建的 key 绑定租约写入 Etcd,根据 Etcd 的 Revision 机制,
    假设两个客户端 put 操作返回的 Revision 分别为 1、2,客户端需记录 Revision 用来判断自己是否获得锁。
  4. 客户端判断是否获得锁
    客户端以前缀 /etcd/lock/ 读取 key-Value 列表,判断自己 key 的 Revision 是否为当前列表中最小的,如果是则认为获得锁;否则监听列表中前一个 Revision 比自己小的 key 的删除事件,一旦监听到删除事件或者因租约失效而删除的事件,则自己获得锁。
  5. 执行业务获得锁后,操作共享资源,执行业务代码。

image.png
image.png

demo

首先要准备数据
image.png

  1. <!-- jetcd -->
  2. <dependency>
  3. <groupId>io.etcd</groupId>
  4. <artifactId>jetcd-all</artifactId>
  5. <version>0.5.11</version>
  6. </dependency>
  7. <!-- redis -->
  8. <dependency>
  9. <groupId>org.springframework.boot</groupId>
  10. <artifactId>spring-boot-starter-data-redis</artifactId>
  11. </dependency>

添加application.yml配置

  1. server:
  2. port: 8080
  3. logging:
  4. config: classpath:logback-spring.xml
  5. etcd:
  6. servers: http://192.168.1.30:2379,http://192.168.1.31:2379,http://192.168.1.32:2379
  7. lockPath: /lock/stock
  8. spring:
  9. redis:
  10. host: 127.0.0.1
  11. password:

etcd分布式锁的实现

  1. @Data
  2. public class EtcdDistributedLock extends AbstractLock {
  3. private final static Logger LOGGER = LoggerFactory.getLogger(EtcdDistributedLock.class);
  4. private Client client;
  5. private Lock lockClient;
  6. private Lease leaseClient;
  7. private String lockKey;
  8. //锁路径,方便记录日志
  9. private String lockPath;
  10. //锁的次数
  11. private AtomicInteger lockCount;
  12. //租约有效期。作用 1:客户端崩溃,租约到期后自动释放锁,防止死锁 2:正常执行自动进行续租
  13. private Long leaseTTL;
  14. //续约锁租期的定时任务,初次启动延迟,默认为1s,根据实际业务需要设置
  15. private Long initialDelay = 0L;
  16. //定时任务线程池
  17. ScheduledExecutorService scheduledExecutorService;
  18. //线程与锁对象的映射
  19. private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
  20. public EtcdDistributedLock(Client client, String lockKey, Long leaseTTL, TimeUnit unit) {
  21. this.client = client;
  22. this.lockClient = client.getLockClient();
  23. this.leaseClient = client.getLeaseClient();
  24. this.lockKey = lockKey;
  25. this.leaseTTL = unit.toNanos(leaseTTL);
  26. scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
  27. }
  28. @Override
  29. public void lock() {
  30. Thread currentThread = Thread.currentThread();
  31. LockData existsLockData = threadData.get(currentThread);
  32. //System.out.println(currentThread.getName() + " 加锁 existsLockData:" + existsLockData);
  33. //锁重入
  34. if (existsLockData != null && existsLockData.isLockSuccess()) {
  35. int lockCount = existsLockData.lockCount.incrementAndGet();
  36. if (lockCount < 0) {
  37. throw new Error("超出etcd锁可重入次数限制");
  38. }
  39. return;
  40. }
  41. //创建租约,记录租约id
  42. long leaseId;
  43. try {
  44. leaseId = leaseClient.grant(TimeUnit.NANOSECONDS.toSeconds(leaseTTL)).get().getID();
  45. //续租心跳周期
  46. long period = leaseTTL - leaseTTL / 5;
  47. //启动定时续约
  48. scheduledExecutorService.scheduleAtFixedRate(new KeepAliveTask(leaseClient, leaseId),
  49. initialDelay,
  50. period,
  51. TimeUnit.NANOSECONDS);
  52. //加锁
  53. LockResponse lockResponse = lockClient.lock(ByteSequence.from(lockKey.getBytes()), leaseId).get();
  54. if (lockResponse != null) {
  55. lockPath = lockResponse.getKey().toString(StandardCharsets.UTF_8);
  56. LOGGER.info("线程:{} 加锁成功,锁路径:{}", currentThread.getName(), lockPath);
  57. }
  58. //加锁成功,设置锁对象
  59. LockData lockData = new LockData(lockKey, currentThread);
  60. lockData.setLeaseId(leaseId);
  61. lockData.setService(scheduledExecutorService);
  62. threadData.put(currentThread, lockData);
  63. lockData.setLockSuccess(true);
  64. } catch (InterruptedException | ExecutionException e) {
  65. e.printStackTrace();
  66. }
  67. }
  68. @Override
  69. public void unlock() {
  70. Thread currentThread = Thread.currentThread();
  71. //System.out.println(currentThread.getName() + " 释放锁..");
  72. LockData lockData = threadData.get(currentThread);
  73. //System.out.println(currentThread.getName() + " lockData " + lockData);
  74. if (lockData == null) {
  75. throw new IllegalMonitorStateException("线程:" + currentThread.getName() + " 没有获得锁,lockKey:" + lockKey);
  76. }
  77. int lockCount = lockData.lockCount.decrementAndGet();
  78. if (lockCount > 0) {
  79. return;
  80. }
  81. if (lockCount < 0) {
  82. throw new IllegalMonitorStateException("线程:" + currentThread.getName() + " 锁次数为负数,lockKey:" + lockKey);
  83. }
  84. try {
  85. //正常释放锁
  86. if (lockPath != null) {
  87. lockClient.unlock(ByteSequence.from(lockPath.getBytes())).get();
  88. }
  89. //关闭续约的定时任务
  90. lockData.getService().shutdown();
  91. //删除租约
  92. if (lockData.getLeaseId() != 0L) {
  93. leaseClient.revoke(lockData.getLeaseId());
  94. }
  95. } catch (InterruptedException | ExecutionException e) {
  96. //e.printStackTrace();
  97. LOGGER.error("线程:" + currentThread.getName() + "解锁失败。", e);
  98. } finally {
  99. //移除当前线程资源
  100. threadData.remove(currentThread);
  101. }
  102. LOGGER.info("线程:{} 释放锁", currentThread.getName());
  103. }
  104. }

接口测试

  1. @RestController
  2. public class StockController {
  3. private final StringRedisTemplate redisTemplate;
  4. @Value("${server.port}")
  5. private String port;
  6. @Value("${etcd.lockPath}")
  7. private String lockKey;
  8. private final Client etcdClient;
  9. public StockController(StringRedisTemplate redisTemplate, @Value("${etcd.servers}") String servers) {
  10. //System.out.println("etcd servers:" + servers);
  11. this.redisTemplate = redisTemplate;
  12. this.etcdClient = Client.builder().endpoints(servers.split(",")).build();
  13. }
  14. @RequestMapping("/stock/reduce")
  15. public String reduceStock() {
  16. Lock lock = new EtcdDistributedLock(etcdClient, lockKey, 30L, TimeUnit.SECONDS);
  17. //获得锁
  18. lock.lock();
  19. //扣减库存
  20. int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
  21. if (stock > 0) {
  22. int realStock = stock - 1;
  23. redisTemplate.opsForValue().set("stock", String.valueOf(realStock));
  24. //同时lucky+1
  25. redisTemplate.opsForValue().increment("lucky");
  26. } else {
  27. System.out.println("库存不足");
  28. }
  29. //释放锁
  30. lock.unlock();
  31. return port + " reduce stock end!";
  32. }
  33. }

参考链接:https://juejin.cn/post/6883866765890322445