群友分享的一个 es 的 key lock 工具
/**
* @author tys
* @version 1.0
* @date 2022/8/18 5:00 PM
*/
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
/**
* This class manages locks. Locks can be accessed with an identifier and are
* created the first time they are acquired and removed if no thread hold the
* lock. The latter is important to assure that the list of locks does not grow
* infinitely.
* Note: this lock is reentrant
*
* */
public final class KeyedLock<T> {
private final ConcurrentMap<T, KeyLock> map = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
private final boolean fair;
/**
* Creates a new lock
* @param fair Use fair locking, ie threads get the lock in the order they requested it
*/
public KeyedLock(boolean fair) {
this.fair = fair;
}
/**
* Creates a non-fair lock
*/
public KeyedLock() {
this(false);
}
/**
* Acquires a lock for the given key. The key is compared by it's equals method not by object identity. The lock can be acquired
* by the same thread multiple times. The lock is released by closing the returned {@link Releasable}.
*/
public Releasable acquire(T key) {
while (true) {
KeyLock perNodeLock = map.get(key);
if (perNodeLock == null) {
ReleasableLock newLock = tryCreateNewLock(key);
if (newLock != null) {
return newLock;
}
} else {
assert perNodeLock != null;
int i = perNodeLock.count.get();
if (i > 0 && perNodeLock.count.compareAndSet(i, i + 1)) {
perNodeLock.lock();
return new ReleasableLock(key, perNodeLock);
}
}
}
}
/**
* Tries to acquire the lock for the given key and returns it. If the lock can't be acquired null is returned.
*/
public Releasable tryAcquire(T key) {
final KeyLock perNodeLock = map.get(key);
if (perNodeLock == null) {
return tryCreateNewLock(key);
}
if (perNodeLock.tryLock()) { // ok we got it - make sure we increment it accordingly otherwise release it again
int i;
while ((i = perNodeLock.count.get()) > 0) {
// we have to do this in a loop here since even if the count is > 0
// there could be a concurrent blocking acquire that changes the count and then this CAS fails. Since we already got
// the lock we should retry and see if we can still get it or if the count is 0. If that is the case and we give up.
if (perNodeLock.count.compareAndSet(i, i + 1)) {
return new ReleasableLock(key, perNodeLock);
}
}
perNodeLock.unlock(); // make sure we unlock and don't leave the lock in a locked state
}
return null;
}
private ReleasableLock tryCreateNewLock(T key) {
KeyLock newLock = new KeyLock(fair);
newLock.lock();
KeyLock keyLock = map.putIfAbsent(key, newLock);
if (keyLock == null) {
return new ReleasableLock(key, newLock);
}
return null;
}
/**
* Returns <code>true</code> iff the caller thread holds the lock for the given key
*/
public boolean isHeldByCurrentThread(T key) {
KeyLock lock = map.get(key);
if (lock == null) {
return false;
}
return lock.isHeldByCurrentThread();
}
private void release(T key, KeyLock lock) {
assert lock == map.get(key);
final int decrementAndGet = lock.count.decrementAndGet();
lock.unlock();
if (decrementAndGet == 0) {
map.remove(key, lock);
}
assert decrementAndGet >= 0 : decrementAndGet + " must be >= 0 but wasn't";
}
private final class ReleasableLock implements Releasable {
final T key;
final KeyLock lock;
final AtomicBoolean closed = new AtomicBoolean();
private ReleasableLock(T key, KeyLock lock) {
this.key = key;
this.lock = lock;
}
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
release(key, lock);
}
}
}
@SuppressWarnings("serial")
private static final class KeyLock extends ReentrantLock {
KeyLock(boolean fair) {
super(fair);
}
private final AtomicInteger count = new AtomicInteger(1);
}
/**
* Returns <code>true</code> if this lock has at least one locked key.
*/
public boolean hasLockedKeys() {
return map.isEmpty() == false;
}
}
重点总结:
ConcurrentMap.putIfAbsent
用于 keylock 的创建AtomicInteger.compareAndSet
用于在同步阻塞获取锁的时候,判定哪一个线程能获取到锁- 释放锁的时候,根据 AtomicInteger 的值判定,如果等于 0 的时候,就从 map 中移除锁