使用场景


Distributed locks are a very useful primitive in many environments where different processes must operate with shared resources in a mutually exclusive way.

分布式系统控制共享资源的访问.单机情况下,一个 java 进程中,如果要控制多线程对共享资源的访问,加锁即可。但是在分布式系统,如何保证同一时间,在分布式集群中只有一个线程访问共享资源,只加锁,是没有办法做到的,这时候就需要分布式锁。

分布式锁实现方式

1.基于redis

redis 实现的分布式锁,需要使用到的 redis 命令;

  1. SET resource_name my_random_value NX PX 30000
  • my_random_value是由客户端生成的一个随机字符串,相当于是客户端持有锁的标志
  • NX表示只有当resource_name对应的key值不存在的时候才能SET成功,相当于只有第一个请求的客户端才能获得锁
  • PX 30000表示这个锁有一个30秒的自动过期时间。

当且仅有 resource_name 这个key不存在,才能设置成功。value 为 my_random_value,这个 value 必须是全局唯一的。

锁的释放:为了防止客户端1持有的锁被客户端2释放,可以使用下面的 lua 脚本释放锁。只有当 value 与客户端传入的 value 相等时,才会释放锁。

  1. if redis.call("get",KEYS[1]) == ARGV[1] then
  2. return redis.call("del",KEYS[1])
  3. else
  4. return 0
  5. end

在执行这段LUA脚本的时候,KEYS[1] 的值为resource_name,ARGV[1] 的值为 my_random_value。原理就是先获取锁对应的 value值,保证和客户端传进去的 my_random_value值相等,这样就能避免自己的锁被其他人释放。另外,采取 Lua 脚本操作保证了原子性.如果不是原子性操作,则有了下述情况出现,一个客户端释放了另外一个客户端的锁。
image.png

现在存在这样一个问题:如果一个线程持有了锁,到了超时时间锁被释放,但是业务仍然没有处理完。这时候,另外一个线程获取了锁,这时候共享资源就不是线程安全的了。redisson 的 watchDog 自动延时机制解决了这个问题。只要客户端1一旦加锁成功,就会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端1 还持有锁 key,那么就会不断的延长锁 key 的生存时间。

开发中可以使用 redisson 去实现,这也是官方推荐的方式。

项目上实现的 redis 锁,使用lua 脚本解锁,使用 ThreadLocal 做了可重入判断:

  1. public class RedisService {
  2. private StringRedisTemplate stringRedisTemplate;
  3. @Value("${distribute.lock.MaxSeconds:100}")
  4. private Integer lockMaxSeconds;
  5. private static Long LOCK_WAIT_MAX_TIME = 120000L;
  6. /**
  7. * 保存锁的value
  8. */
  9. private TransmittableThreadLocal<String> redisLockReentrant = new TransmittableThreadLocal<>();
  10. /** 解锁lua脚本 */
  11. private static final String RELEASE_LOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
  12. /**
  13. * 获取redis分布式锁
  14. *
  15. * @param keyPrefix redis锁 key前缀
  16. * @param key key
  17. * @return
  18. */
  19. public boolean redisLock(String keyPrefix, String key) {
  20. long endTime = System.currentTimeMillis() + LOCK_WAIT_MAX_TIME;
  21. //可重入锁判断
  22. String originValue = redisLockReentrant.get();
  23. String lockKey = getLockKey(keyPrefix, key);
  24. if (!StringUtils.isBlank(originValue) && isReentrantLock(lockKey, originValue)) {
  25. return true;
  26. }
  27. RedisCallback<Boolean> callback = (connection) -> connection.set(
  28. lockKey.getBytes(StandardCharsets.UTF_8),
  29. key.getBytes(StandardCharsets.UTF_8),
  30. Expiration.seconds(lockMaxSeconds),
  31. RedisStringCommands.SetOption.SET_IF_ABSENT);
  32. //在timeout时间内仍未获取到锁,则获取失败
  33. try {
  34. while (System.currentTimeMillis() < endTime) {
  35. if (stringRedisTemplate.execute(callback)) {
  36. redisLockReentrant.set(key);
  37. return true;
  38. }
  39. Thread.sleep(100);
  40. }
  41. } catch (InterruptedException e) {
  42. logger.error("获取redis分布式锁出错", e);
  43. }
  44. return false;
  45. }
  46. /**
  47. * 释放分布式锁
  48. * @param keyPrefix redis锁 key前缀
  49. * @param key key
  50. * @return
  51. */
  52. public Boolean redisUnLock(String keyPrefix, String key) {
  53. String lockKey = getLockKey(keyPrefix, key);
  54. RedisCallback<Boolean> callback = (connection) -> connection.eval(
  55. RELEASE_LOCK_SCRIPT.getBytes(),
  56. ReturnType.BOOLEAN, 1,
  57. lockKey.getBytes(StandardCharsets.UTF_8),
  58. key.getBytes(StandardCharsets.UTF_8));
  59. //清空 ThreadLocal
  60. redisLockReentrant.remove();
  61. return stringRedisTemplate.execute(callback);
  62. }
  63. private String getLockKey(String keyPrefix, String key) {
  64. return keyPrefix + "-" + key;
  65. }
  66. /**
  67. * 是否为重入锁
  68. */
  69. private boolean isReentrantLock(String lockKey, String originValue) {
  70. String redisValue = stringRedisTemplate.opsForValue().get(lockKey);
  71. return StringUtils.isNotBlank(redisValue) && originValue.equals(redisValue);
  72. }
  73. }

2.基于数据库锁

悲观锁:一段执行逻辑加上悲观锁,不同线程同时执行时,只能有一个线程执行,其他的线程在入口处等待,直到锁被释放。

乐观锁:一段执行逻辑加上乐观锁,不同线程同时执行时,可以同时进入执行,在最后更新数据的时候要检查这些数据是否被其他线程修改了(版本和执行初是否相同),没有修改则进行更新,否则放弃本次操作。

悲观锁的实现:

//0.开始事务
begin;/begin work;/start transaction; (三者选一就可以)
//1.查询出商品信息
select status from t_goods where id=1 for update;
//2.根据商品信息生成订单
insert into t_orders (id,goods_id) values (null,1);
//3.修改商品status为2
update t_goods set status=2;
//4.提交事务
commit;/commit work;

乐观锁的实现

1.查询出商品信息
select (status,status,version) from t_goods where id=#{id}
2.根据商品信息生成订单
3.修改商品status为2
update t_goods 
set status=2,version=version+1
where id=#{id} and version=#{version};

3.zookeeper 分布式锁

zookeeper是一个为分布式应用提供一致性服务的软件,它内部是一个分层的文件系统目录树结构,规定统一个目录下只能有一个唯一文件名
数据模型

  • 永久节点 节点创建后,不会因为会话失效而消失
  • 临时节点 与永久节点相反,如果客户端连接失效,则立即删除节点
  • 顺序节点 与上述两个节点特性类似,如果指定创建这类节点时,zk会自动在节点名后加一个数字后缀,并且是有序的 监视器(watcher): 当创建一个节点时,可以注册一个该节点的监视器,当节点状态发生改变时,watch 被触发时,ZooKeeper将会向客户端发送且仅发送一条通知,因为watch只能被触发一次

根据zookeeper的这些特性来实现分布式锁

  1. 创建一个锁目录lock
  2. 希望获得锁的线程A就在lock目录下,创建临时顺序节点
  3. 获取锁目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁
  4. 线程B获取所有节点,判断自己不是最小节点,设置监听(watcher)比自己次小的节点(只关注比自己次小的节点是为了防止发生“羊群效应”)
  5. 线程A处理完,删除自己的节点,线程B监听到变更事件,判断自己是最小的节点,获得锁。

zklock.png

参考:https://redis.io/topics/distlock
https://mp.weixin.qq.com/s/0wmVSfrkFq7BfpUvydr-ug
https://www.cnblogs.com/AnXinliang/p/10019389.html