读写锁在同一时刻允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁,一个写锁,通过分离读锁和写锁,使得并发性相比一般的排它锁有了很大提升。
特性 | 说明 |
---|---|
公平性选择 | 支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平 |
重进入 | 该锁支持重进入,以读写线程为例:读线程在获取了读锁之后,能够再次获取读锁。而写线程在获取了写锁后能够再次获取写锁,也可以获取读锁。 |
锁降级 | 遵循获取写锁、获取读锁再释放写锁的顺序,写锁能够降级为读锁 |
特性 | 说明 |
---|---|
int getReadLockCount() | 返回当前读锁被获取的次数。该次数不等于获取读锁的线程数。 |
int getReadHoldCount() | 返回当前线程获取读锁的次数 |
Boolean isWriteLocked() | 判断写锁是否被获取 |
int getWriteHoldCount() | 返回当前写锁被获取的次数 |
读写锁的实现
1、读写状态的设计
读写锁同样依赖自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态。
读写锁将变量切分成了两个部分,高16位表示读,低16位表示写。通过位运算来得出读锁和写锁获取的次数。
/**这个非公平策略的同步器是写锁优先的,申请写锁时总是不阻塞。
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // 写线程总是可以突入
}
final boolean readerShouldBlock() {
/* 作为一个启发用于避免写线程饥饿,如果线程临时出现在等待队列的头部则阻塞,
* 如果存在这样的,则是写线程。
*/
return apparentlyFirstQueuedIsExclusive();
}
}
/*公平的 Sync,它的策略是:如果线程准备获取锁时,
* 同步队列里有等待线程,则阻塞获取锁,不管是否是重入
* 这也就需要tryAcqire、tryAcquireShared方法进行处理。
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
2、写锁的获取与释放
写锁是一个支持重入的排它锁。如果当前线程已经已经获取了写锁,则增加写状态。如果当前线程在获取锁时,读锁已经被获取或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。如果存在读锁,则写锁不能被获取,原因在于:读写锁要确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在允许的其他读线程就无法感知到当前线程的操作。
lock
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1.如果读取计数非零或写入计数非零且所有者是不同的线程,则失败。
2.如果计数会饱和,那就失败了。 (只有在计数已经非零时才会发生这种情况。)
3.否则,如果该线程是可重入获取或队列策略允许,则该线程有资格获得锁定。
如果是,请更新状态并设置所有者。
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
//写锁获取次数等于0,或者当前线程不是排它锁拥有线程
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
//写锁是否阻塞
if (writerShouldBlock() ||
//更新同步状态
!compareAndSetState(c, c + acquires))
return false;
//设置排它锁拥有线程
setExclusiveOwnerThread(current);
return true;
}
tryLock,尝试获取写锁,立即返回,不阻塞
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
写锁的释放,因为只允许一个线程获取写锁,所以锁的释放不需要考虑线程安全问题
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
3、读锁的获取与释放
读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问时,读锁总会被成功的获取,而所做的只是增加读状态
读锁的获取
protected final int tryAcquireShared(int unused) {
/*
*1.如果另一个线程持有写锁定,则失败。
2.否则,此线程符合锁定wrt状态,因此请询问是否应该因为队列策略而阻塞。 如果没有,请尝试通过CASing状态授予并更新count.Note该步骤不检查可重入获取,这被推迟到完整版本以避免在更典型的非重入情况下检查保持计数。
3.如果步骤2因为线程显然不符合条件或CAS失败或计数饱和而失败,则链接到具有完全重试循环的版本。
*/
Thread current = Thread.currentThread();
int c = getState();
//排它锁获取次数不为0,并且排它锁拥有线程不是当线程,返回失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//共享锁获取次数
int r = sharedCount(c);
//不阻塞、没有达到最大值并且同步状态修改成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//第一次获取读锁
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
//再次获取读锁,并且读锁拥有线程是当前线程
} else if (firstReader == current) {
firstReaderHoldCount++;
//再次获取读锁,当前线程不是读锁拥有线程
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
/*
* 此代码与tryAcquireShared中的代码部分冗余,
但总体上更简单,因为不会使tryAcquireShared与重试之间的交互和懒惰读取保持计数复杂化。
*/
HoldCounter rh = null;
//自旋
for (;;) {
int c = getState();
//排它锁获取次数
if (exclusiveCount(c) != 0) {
//排它锁拥有线程不是当前线程,返回
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
//是否阻塞读锁
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
//共享锁获取次数
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//共享状态修改成功
if (compareAndSetState(c, c + SHARED_UNIT)) {
//第一次获取
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
读锁的释放
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
//自旋更新同步状态
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
4、锁降级
锁降级是指写锁降级为读锁。如果当前线程拥有写锁,然后将其释放,最后再次获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住写锁,再获取到读锁,随后释放写锁的过程。