分布式锁

官网:
https://curator.apache.org/getting-started.html

加锁原理:
image.png

非公平锁

所有请求在请求同一个节点
无法控制先来后到顺序的
不断请求锁的性能消耗
养群效应,都在抢资源

公平锁

临时顺序节点
判断最小的节点就是获得者,
image.png

  1. 请求会在lock创建临时顺序节点
  2. 判断是不是当前节点最小的节点
    1. 是,获得suo
    2. 不是,监听前面一个节点即可watch
  3. 获取的锁,处理业务释放说,清除节点
    1. 后一个节点会会收到通知

避免竞争,缓解压力,对加锁排队,公平锁

节点异常删除的场景

幽灵节点
创建成功,返回失败创建了一个客户端感知不到的节点
解决方案:protection模式
自带个ID, 去判断是否已存在

示例代码

  1. public Object reduceStock(Integer id) throws Exception {
  2. //互斥锁
  3. InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, "/product_" + id);
  4. try {
  5. // 获取锁
  6. interProcessMutex.acquire();
  7. //处理业务逻辑
  8. } catch (Exception e) {
  9. if (e instanceof RuntimeException) {
  10. throw e;
  11. }
  12. }finally {
  13. //释放锁
  14. interProcessMutex.release();
  15. }
  16. return "success";
  17. }

源码分析

  1. private boolean internalLock(long time, TimeUnit unit) throws Exception
  2. {
  3. /*
  4. Note on concurrency: a given lockData instance
  5. can be only acted on by a single thread so locking isn't necessary
  6. */
  7. Thread currentThread = Thread.currentThread();
  8. //获取锁
  9. LockData lockData = threadData.get(currentThread);
  10. if ( lockData != null )
  11. {
  12. // re-entering重入
  13. lockData.lockCount.incrementAndGet();
  14. return true;
  15. }
  16. //把锁跟当前线程关联
  17. String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
  18. if ( lockPath != null )
  19. {
  20. LockData newLockData = new LockData(currentThread, lockPath);
  21. threadData.put(currentThread, newLockData);
  22. return true;
  23. }
  24. return false;
  25. }
  1. @Override
  2. public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
  3. {
  4. String ourPath;
  5. if ( lockNodeBytes != null )
  6. {
  7. //容器节点:没有子节点会自动被删除
  8. ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
  9. }
  10. else
  11. {
  12. ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
  13. }
  14. return ourPath;
  15. }


InterProcessMutex

过程

网络拿到的所有子节点的结果不是顺序的

独占锁

非独占锁

读写锁:
根据节点前缀判断
1653289580(1).png

示例code

  1. public Object reduceStock(Integer id) throws Exception {
  2. InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(curatorFramework, "/product_" + id);
  3. InterProcessMutex writeLock = interProcessReadWriteLock.writeLock();
  4. try {
  5. // ...
  6. writeLock.acquire();
  7. orderService.reduceStock(id);
  8. } catch (Exception e) {
  9. if (e instanceof RuntimeException) {
  10. throw e;
  11. }
  12. }finally {
  13. writeLock.release();
  14. }
  15. return "ok:" + port;
  16. }

maxlease : 
不同点:判断加锁逻辑上
源码

  1. public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData)
  2. {
  3. lockData = (lockData == null) ? null : Arrays.copyOf(lockData, lockData.length);
  4. //写锁与互斥锁是一样的
  5. writeMutex = new InternalInterProcessMutex
  6. (
  7. client,
  8. basePath,
  9. WRITE_LOCK_NAME,
  10. lockData,
  11. 1,
  12. new SortingLockInternalsDriver()
  13. {
  14. @Override
  15. public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
  16. {
  17. return super.getsTheLock(client, children, sequenceNodeName, maxLeases);
  18. }
  19. }
  20. );
  21. //读锁
  22. readMutex = new InternalInterProcessMutex
  23. (
  24. client,
  25. basePath,
  26. READ_LOCK_NAME,
  27. lockData,
  28. Integer.MAX_VALUE,
  29. new SortingLockInternalsDriver()
  30. {
  31. @Override
  32. public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
  33. {
  34. return readLockPredicate(children, sequenceNodeName);
  35. }
  36. }
  37. );
  38. }
  1. public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData) {
  2. lockData = lockData == null ? null : Arrays.copyOf(lockData, lockData.length);
  3. this.writeMutex = new InterProcessReadWriteLock.InternalInterProcessMutex(client, basePath, "__WRIT__", lockData, 1, new InterProcessReadWriteLock.SortingLockInternalsDriver() {
  4. public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
  5. return super.getsTheLock(client, children, sequenceNodeName, maxLeases);
  6. }
  7. });
  8. this.readMutex = new InterProcessReadWriteLock.InternalInterProcessMutex(client, basePath, "__READ__", lockData, 2147483647, new InterProcessReadWriteLock.SortingLockInternalsDriver() {
  9. public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
  10. return InterProcessReadWriteLock.this.readLockPredicate(children, sequenceNodeName);
  11. }
  12. });
  13. }

选举,获取leader

底层也是分布式锁的方式获取互斥锁

代码示例

  1. String appName = System.getProperty("appName");
  2. CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(CONNECT_STR, retryPolicy);
  3. LeaderSelectorDemo.curatorFramework = curatorFramework;
  4. curatorFramework.start();//这样才会建立链接,而不是只是个示例类
  5. //这个是获取成为leader后的回到
  6. LeaderSelectorListener listener = new LeaderSelectorListenerAdapter()
  7. {
  8. public void takeLeadership(CuratorFramework client) throws Exception
  9. {
  10. System.out.println(" I' m leader now . i'm , "+appName);
  11. }
  12. };
  13. LeaderSelector selector = new LeaderSelector(curatorFramework, "/cachePreHeat_leader", listener);
  14. selector.autoRequeue(); // not required, but this is behavior that you will probably expect
  15. selector.start();//这样才会会发起抢占leader
  16. selector.hasLeadership();//判断当前是否是leader

类:

LeaderLatch
只有close才能释放leader
LeaderSelector
调用takeLeadership就能释放,让其他节点获取leader

幽灵节点

withProtection方式实现的

注册中心的使用

注册:创建节点信息
spring cloud集成了注册中心
spring cloud官网有的

流程:

  1. 启动时,在applicationName路径下创建临时子节点,以及相关数据
  2. 调用方通过applicationName获取所有子节点的信息,ip,端口等
  3. 调用方缓存在本地,同时监听临时节点的变化
  4. 如果applicationName下节点变化,就会重新请求重新本地缓存

image.png

Spring Cloud 生态也提供了Zookeeper注册中心的实现

maven

  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
  4. </dependency>

配置文件:

spring.application.name=user-center

zookeeper 连接地址 ,

如果使用了 spring cloud zookeeper config这个配置应该配置在 bootstrap.yml/bootstrap.properties中 spring.cloud.zookeeper.connect-string=192.168.109.200:2181

将本服务注册到zookeeper,如果不希望自己被发现可以配置为false, 默认为 true spring.cloud.zookeeper.discovery.register=true