分布式锁
官网:
https://curator.apache.org/getting-started.html
非公平锁
所有请求在请求同一个节点
无法控制先来后到顺序的
不断请求锁的性能消耗
养群效应,都在抢资源
公平锁
临时顺序节点
判断最小的节点就是获得者,
- 请求会在lock创建临时顺序节点
- 判断是不是当前节点最小的节点
- 是,获得suo
- 不是,监听前面一个节点即可watch
- 获取的锁,处理业务释放说,清除节点
- 后一个节点会会收到通知
避免竞争,缓解压力,对加锁排队,公平锁
节点异常删除的场景
幽灵节点
创建成功,返回失败创建了一个客户端感知不到的节点
解决方案:protection模式
自带个ID, 去判断是否已存在
示例代码
public Object reduceStock(Integer id) throws Exception {
//互斥锁
InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, "/product_" + id);
try {
// 获取锁
interProcessMutex.acquire();
//处理业务逻辑
} catch (Exception e) {
if (e instanceof RuntimeException) {
throw e;
}
}finally {
//释放锁
interProcessMutex.release();
}
return "success";
}
源码分析
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
//获取锁
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// re-entering重入
lockData.lockCount.incrementAndGet();
return true;
}
//把锁跟当前线程关联
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
@Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
String ourPath;
if ( lockNodeBytes != null )
{
//容器节点:没有子节点会自动被删除
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
}
else
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
类
InterProcessMutex
过程
网络拿到的所有子节点的结果不是顺序的
独占锁
非独占锁
读写锁:
根据节点前缀判断
示例code
public Object reduceStock(Integer id) throws Exception {
InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(curatorFramework, "/product_" + id);
InterProcessMutex writeLock = interProcessReadWriteLock.writeLock();
try {
// ...
writeLock.acquire();
orderService.reduceStock(id);
} catch (Exception e) {
if (e instanceof RuntimeException) {
throw e;
}
}finally {
writeLock.release();
}
return "ok:" + port;
}
maxlease :
不同点:判断加锁逻辑上
源码
public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData)
{
lockData = (lockData == null) ? null : Arrays.copyOf(lockData, lockData.length);
//写锁与互斥锁是一样的
writeMutex = new InternalInterProcessMutex
(
client,
basePath,
WRITE_LOCK_NAME,
lockData,
1,
new SortingLockInternalsDriver()
{
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
return super.getsTheLock(client, children, sequenceNodeName, maxLeases);
}
}
);
//读锁
readMutex = new InternalInterProcessMutex
(
client,
basePath,
READ_LOCK_NAME,
lockData,
Integer.MAX_VALUE,
new SortingLockInternalsDriver()
{
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
return readLockPredicate(children, sequenceNodeName);
}
}
);
}
public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData) {
lockData = lockData == null ? null : Arrays.copyOf(lockData, lockData.length);
this.writeMutex = new InterProcessReadWriteLock.InternalInterProcessMutex(client, basePath, "__WRIT__", lockData, 1, new InterProcessReadWriteLock.SortingLockInternalsDriver() {
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
return super.getsTheLock(client, children, sequenceNodeName, maxLeases);
}
});
this.readMutex = new InterProcessReadWriteLock.InternalInterProcessMutex(client, basePath, "__READ__", lockData, 2147483647, new InterProcessReadWriteLock.SortingLockInternalsDriver() {
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
return InterProcessReadWriteLock.this.readLockPredicate(children, sequenceNodeName);
}
});
}
选举,获取leader
代码示例
String appName = System.getProperty("appName");
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(CONNECT_STR, retryPolicy);
LeaderSelectorDemo.curatorFramework = curatorFramework;
curatorFramework.start();//这样才会建立链接,而不是只是个示例类
//这个是获取成为leader后的回到
LeaderSelectorListener listener = new LeaderSelectorListenerAdapter()
{
public void takeLeadership(CuratorFramework client) throws Exception
{
System.out.println(" I' m leader now . i'm , "+appName);
}
};
LeaderSelector selector = new LeaderSelector(curatorFramework, "/cachePreHeat_leader", listener);
selector.autoRequeue(); // not required, but this is behavior that you will probably expect
selector.start();//这样才会会发起抢占leader
selector.hasLeadership();//判断当前是否是leader
类:
LeaderLatch
只有close才能释放leader
LeaderSelector
调用takeLeadership就能释放,让其他节点获取leader
幽灵节点
withProtection方式实现的
注册中心的使用
注册:创建节点信息
spring cloud集成了注册中心
spring cloud官网有的
流程:
- 启动时,在applicationName路径下创建临时子节点,以及相关数据
- 调用方通过applicationName获取所有子节点的信息,ip,端口等
- 调用方缓存在本地,同时监听临时节点的变化
- 如果applicationName下节点变化,就会重新请求重新本地缓存
Spring Cloud 生态也提供了Zookeeper注册中心的实现
maven
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
</dependency>
配置文件:
spring.application.name=user-center
zookeeper 连接地址 ,
如果使用了 spring cloud zookeeper config这个配置应该配置在 bootstrap.yml/bootstrap.properties中 spring.cloud.zookeeper.connect-string=192.168.109.200:2181
将本服务注册到zookeeper,如果不希望自己被发现可以配置为false, 默认为 true spring.cloud.zookeeper.discovery.register=true