分布式锁
官网:
https://curator.apache.org/getting-started.html
非公平锁
所有请求在请求同一个节点
无法控制先来后到顺序的
不断请求锁的性能消耗
养群效应,都在抢资源
公平锁
临时顺序节点
判断最小的节点就是获得者,
- 请求会在lock创建临时顺序节点
- 判断是不是当前节点最小的节点
- 是,获得suo
- 不是,监听前面一个节点即可watch
- 获取的锁,处理业务释放说,清除节点
- 后一个节点会会收到通知
避免竞争,缓解压力,对加锁排队,公平锁
节点异常删除的场景
幽灵节点
创建成功,返回失败创建了一个客户端感知不到的节点
解决方案:protection模式
自带个ID, 去判断是否已存在
示例代码
public Object reduceStock(Integer id) throws Exception {//互斥锁InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, "/product_" + id);try {// 获取锁interProcessMutex.acquire();//处理业务逻辑} catch (Exception e) {if (e instanceof RuntimeException) {throw e;}}finally {//释放锁interProcessMutex.release();}return "success";}
源码分析
private boolean internalLock(long time, TimeUnit unit) throws Exception{/*Note on concurrency: a given lockData instancecan be only acted on by a single thread so locking isn't necessary*/Thread currentThread = Thread.currentThread();//获取锁LockData lockData = threadData.get(currentThread);if ( lockData != null ){// re-entering重入lockData.lockCount.incrementAndGet();return true;}//把锁跟当前线程关联String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());if ( lockPath != null ){LockData newLockData = new LockData(currentThread, lockPath);threadData.put(currentThread, newLockData);return true;}return false;}
@Overridepublic String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception{String ourPath;if ( lockNodeBytes != null ){//容器节点:没有子节点会自动被删除ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);}else{ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);}return ourPath;}
类
InterProcessMutex
过程
网络拿到的所有子节点的结果不是顺序的
独占锁
非独占锁
读写锁:
根据节点前缀判断
示例code
public Object reduceStock(Integer id) throws Exception {InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(curatorFramework, "/product_" + id);InterProcessMutex writeLock = interProcessReadWriteLock.writeLock();try {// ...writeLock.acquire();orderService.reduceStock(id);} catch (Exception e) {if (e instanceof RuntimeException) {throw e;}}finally {writeLock.release();}return "ok:" + port;}
maxlease :
不同点:判断加锁逻辑上
源码
public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData){lockData = (lockData == null) ? null : Arrays.copyOf(lockData, lockData.length);//写锁与互斥锁是一样的writeMutex = new InternalInterProcessMutex(client,basePath,WRITE_LOCK_NAME,lockData,1,new SortingLockInternalsDriver(){@Overridepublic PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception{return super.getsTheLock(client, children, sequenceNodeName, maxLeases);}});//读锁readMutex = new InternalInterProcessMutex(client,basePath,READ_LOCK_NAME,lockData,Integer.MAX_VALUE,new SortingLockInternalsDriver(){@Overridepublic PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception{return readLockPredicate(children, sequenceNodeName);}});}
public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData) {lockData = lockData == null ? null : Arrays.copyOf(lockData, lockData.length);this.writeMutex = new InterProcessReadWriteLock.InternalInterProcessMutex(client, basePath, "__WRIT__", lockData, 1, new InterProcessReadWriteLock.SortingLockInternalsDriver() {public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {return super.getsTheLock(client, children, sequenceNodeName, maxLeases);}});this.readMutex = new InterProcessReadWriteLock.InternalInterProcessMutex(client, basePath, "__READ__", lockData, 2147483647, new InterProcessReadWriteLock.SortingLockInternalsDriver() {public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {return InterProcessReadWriteLock.this.readLockPredicate(children, sequenceNodeName);}});}
选举,获取leader
代码示例
String appName = System.getProperty("appName");CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(CONNECT_STR, retryPolicy);LeaderSelectorDemo.curatorFramework = curatorFramework;curatorFramework.start();//这样才会建立链接,而不是只是个示例类//这个是获取成为leader后的回到LeaderSelectorListener listener = new LeaderSelectorListenerAdapter(){public void takeLeadership(CuratorFramework client) throws Exception{System.out.println(" I' m leader now . i'm , "+appName);}};LeaderSelector selector = new LeaderSelector(curatorFramework, "/cachePreHeat_leader", listener);selector.autoRequeue(); // not required, but this is behavior that you will probably expectselector.start();//这样才会会发起抢占leaderselector.hasLeadership();//判断当前是否是leader
类:
LeaderLatch
只有close才能释放leader
LeaderSelector
调用takeLeadership就能释放,让其他节点获取leader
幽灵节点
withProtection方式实现的
注册中心的使用
注册:创建节点信息
spring cloud集成了注册中心
spring cloud官网有的
流程:
- 启动时,在applicationName路径下创建临时子节点,以及相关数据
- 调用方通过applicationName获取所有子节点的信息,ip,端口等
- 调用方缓存在本地,同时监听临时节点的变化
- 如果applicationName下节点变化,就会重新请求重新本地缓存

Spring Cloud 生态也提供了Zookeeper注册中心的实现
maven
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-zookeeper-discovery</artifactId></dependency>
配置文件:
spring.application.name=user-center
zookeeper 连接地址 ,
如果使用了 spring cloud zookeeper config这个配置应该配置在 bootstrap.yml/bootstrap.properties中 spring.cloud.zookeeper.connect-string=192.168.109.200:2181
将本服务注册到zookeeper,如果不希望自己被发现可以配置为false, 默认为 true spring.cloud.zookeeper.discovery.register=true
