概述
类名 | 描述 |
---|---|
Sync | ReentrantReadWriteLock 内部类,一个抽象类,继承 AQS 类,实现了 AQS 绝大多数的方法,并暴露两个抽象方法让子类实现:① writeShouldBlock() ② readerShouldBlock() |
NonfairSync | Sync 实现类,实现非公平锁。writeShouldBlock() 方法直接返回 false。 |
FairSync | Sync 实现类,实现公平锁。无论 read 还是 write,都是调用 hasQueuedPredecessors() 方法 |
ReadLock | Lock 接口的实现类。内部方法都是委托 ReentrantReadWriteLock 对象实现 |
WrietLock | Lock 接口的实现类。内部方法都是委托 ReentrantReadWriteLock 对象实现 |
ReadLock 和 WriteLock 都是使用同一个 Sync 子类实现,但是为什么一个 AQS 类可以既实现共享模式,又能实现独占模式呢?
AQS 的精髓就在于对 state 的值是如何定义的。
锁的类型 | state 值 | 描述 |
---|---|---|
独占模式 | 0 | 线程可以获得锁 |
1 | 锁被其它线程占用,当前线程只能插入阻塞队列 | |
共享模式 |
总的来说,独占模式和共享模式对 state 操作完全不一样,那 ReentrantReadWrite 是如何实现在一个 AQS 实例中实现两种锁模式,答案是将 state 这个 int 类型拆分成两部分,高 16 位用于共享模式,低 16 位用于独占模式。
源码分析
内部类:Sync
我们先瞅瞅 ReentrantReadWrite 内部类 Sync 的重要变量:
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
// 将 int 的 state 拆分成两部分,低 16 位表示独占模式,即写锁线程持有数量
// 高 16 位表示共享模式,即读锁线程持有数量
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 取高 16 位值,返回共享锁(读锁)的获取次数(包括重入次数)
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 取低 16 位值,返回独占锁(写锁)的获取次数(包括重入次数)
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
// 记录每个线程持有读锁数量(读锁重入),每个线程都有一个对应的实例对象
static final class HoldCounter {
// 持有读锁的次数
int count = 0;
// 线程ID
final long tid = getThreadId(Thread.currentThread());
}
// 使用 ThreadLocal 存储这个变量值
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
// 从这个对象中可以获取当前线程持有读锁的数量
private transient ThreadLocalHoldCounter readHolds;
/**
* The hold count of the last thread to successfully acquire
* readLock. This saves ThreadLocal lookup in the common case
* where the next thread to release is the last one to
* acquire. This is non-volatile since it is just used
* as a heuristic, and would be great for threads to cache.
*
* <p>Can outlive the Thread for which it is caching the read
* hold count, but avoids garbage retention by not retaining a
* reference to the Thread.
*
* <p>Accessed via a benign data race; relies on the memory
* model's final field and out-of-thin-air guarantees.
*/
/**
* 缓存最后一个成功获得读锁的线程的读锁重入次数
* 无论哪个线程获得读锁,都会设置这个变量。为什么不直接从ThreadLocal获取呢?
* 基于这么一个事实:实际上,一个线程获得读锁后并很快释放,所以在当前线程获取读锁这段时间,
* 其它线程大概率不会获取读锁,因此,这个缓存可以提高最后一个线程操作的效率,节省从ThreadLocal查找的时间
*/
private transient HoldCounter cachedHoldCounter;
/**
* firstReader is the first thread to have acquired the read lock.
* firstReaderHoldCount is firstReader's hold count.
*
* <p>More precisely, firstReader is the unique thread that last
* changed the shared count from 0 to 1, and has not released the
* read lock since then; null if there is no such thread.
*
* <p>Cannot cause garbage retention unless the thread terminated
* without relinquishing its read locks, since tryReleaseShared
* sets it to null.
*
* <p>Accessed via a benign data race; relies on the memory
* model's out-of-thin-air guarantees for references.
*
* <p>This allows tracking of read holds for uncontended read
* locks to be very cheap.
*/
// 缓存首个获取读锁的线程(且未释放),以及它所持有的读锁的数量
// 这允许追踪无竞争读锁的读次数非常容易
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
Sync() {
// 初始化 readHolds
readHolds = new ThreadLocalHoldCounter();
// 为了保证 readHolds 对其它线程可见,相当于做了一道写屏障
setState(getState());
}
// ...
}
- int 类型的 state 变量被拆分成两部分
- 高 16 位表示读锁的获取次数,包含重入次数。当线程获得读锁则
+1
,释放读锁-1
,重入也+1
。 - 低 16 位表示写锁的获取次数,包含重入次数。因为读锁是独占锁,所以
>1
的值表示线程重入次数。
- 高 16 位表示读锁的获取次数,包含重入次数。当线程获得读锁则
- 每个线程都有
ThreadLocal<HoldCounter>
本地变量,里面存储当前线程获取读锁的次数。因为高 16 位的 state 值包含重入和线程获取读锁的次数,因此使用每个线程都拥有HoldCounter
变量来区分是锁重入还是线程获取的读锁。 cacheHoldCounter
具有缓存功能。缓存最后一次获取读锁的线程的HoldCounter
,这是基于每个线程获得读锁到释放读锁这一过程是非常短暂的,较少出现并发情况,所以缓存最后一次获取读锁的线程以减少线程查询ThreadLocal
过程,从而提高性能。firstReader
和firstReaderHoldCount
其实也是充当缓存作用。记录首个获取读锁且未释放读锁的线程。这两个变量的目的在于使得在读锁不产生竞争的情况下,记录读锁重入次数这一操作非常快捷。获取读锁:ReadLock#lock
尝试获得读锁。当写锁没有被其它线程持有时,这个方法会立即返回。 ```java // java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#lock public void lock() { sync.acquireShared(1); }
// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireShared // 这个就是一个模板类,子类需要根据实际需求实现自己的tryAcquireShared方法 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
`tryAcquireShared(arg)` 方法返回值:
1. 如果 `< 0` 表示没有获得共享锁(这里就是读锁),意味此次尝试失败。`> 0` 意味着获得共享锁,意味着此次尝试成功。
1. 如果尝试获得锁操作失败,那么进入 `doAcquireShared(arg)` 方法,这个方法由 `AQS` 实现,它会将当前线程封装为 Node 对象并插入阻塞队列的末尾,阻塞当前线程。然后等待前驱节点唤醒。线程被唤醒后,依旧调用 `tryAcquireShared(arg)` 方法尝试获得锁。如果尝试失败,依旧阻塞,否则退出 `for(;;)` 无限循环,并返回。
对于公平和非公平来说,`tryAcquireShared(arg)` 在 `ReentrantReadWriteLock` 并没有做区分:
```java
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquireShared
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
// #1 首先判断是否有其它线程持有「写锁」,如果持有,显然当前线程是无法获得「读锁」的。
// 当前,还有一种情况:当前线程已经持有「写锁」,它也可以获得「读锁」。
// 当前线程在持有「写锁」的情况下,是可以允许获得「读锁」的。
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
// #2 如果都不满足,直接返回 -1,后续会将当前线程封装为Node节点并插入到「阻塞队列」的末尾
return -1;
// #2 获取读锁的获取次数
int r = sharedCount(c);
// #3 readerShouldBlock():读锁获取是否需要阻塞。公平和非公平有不同实现方法,
// 在非公平模式下,如果阻塞队列不为空,会判断首个线程是否准备获取写锁,如果是,则返回false,否则返回true。
// 如果返回true,会直接调用fullTryAcquireShared获取共享读锁
if (!readerShouldBlock() &&
// #4 判断是否会溢出,其实数量很大的,2^16 - 1
r < MAX_COUNT &&
// #5 通过CAS更新 state 值:将高16位+1,如果成功,意味着成功获得读锁。
compareAndSetState(c, c + SHARED_UNIT)) {
// #6 走到这里,说明当前线程已成功获得「读锁」啦
// 下面就是更新相关缓存
// #7 如果 r=0,说明是首个获得「读锁」的线程,需要重置 firstReader
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// #8 首个线程重入了
firstReaderHoldCount++;
} else {
// #9 更新 cachedHoldCounter,这个类是缓存最后一个获得读锁的线程
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
// #10 如果缓存的是其它线程,更新为当前线程的HoldCounter
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
// #11 意味着 cacheHolderCounter 为当前线程,但是 count = 0
// 更新当前线程ThreadLocal的值。
readHolds.set(rh);
// #12 count + 1
rh.count++;
}
// #13 返回 1 表示成功获得「读锁」
return 1;
}
// #14 走到这里,说明当前线程由于某些原因无法获得「读锁」,需要
return fullTryAcquireShared(current);
}
线程走到 #14
代码处,需要满足以下情况:
readerShouldBlock()
返回 false,这里会分公平锁和非公平锁之分。- 公平模式:调用
hasQueuedPredecessors()
判断阻塞队列中是否有其它元素在等待锁。 - 非公平模式:调用
apparentlyFirstQueuedIsExclusive
判断阻塞队列中 head 的第一个后继节点是否需要获取写锁。如果是,让这个写锁先来,避免写锁饥饿。这里,读锁对写锁在非公平模式下做了让步。
- 公平模式:调用
-
fullTryAcquireShared
来到这个方法的原因之一可能是 CAS 失败,如果是因为这样就要使得当前线程进入阻塞队列,消耗未免过大。所以这个方法其实是给 CAS 失败的线程一次重生的机会。
如果有线程持有写锁
- 重入线程,CAS 设置 state 的值。
- 非重入线程,返回
-1
,表示获取读锁失败。
没有线程持有写锁(
exclusiveCount(c) == 0
)readerShouldBlock()
返回true
:- 如果是首个获得读锁的线程,放过。
- 非首个获得读锁线程, ```java // java.util.concurrent.locks.ReentrantReadWriteLock.Sync#fullTryAcquireShared final int fullTryAcquireShared(Thread current) { /*
- This code is in part redundant with that in
- tryAcquireShared but is simpler overall by not
- complicating tryAcquireShared with interactions between
- retries and lazily reading hold counts. */ HoldCounter rh = null;
// #1 死循环 for (;;) { // #2 获取 state 状态 int c = getState();
// #3 如果其它线程持有写锁,并且判断当前线程是否为持有写锁的线程 // 如果不是,返回 -1,锁写是独占锁 if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock.
} else if (readerShouldBlock()) {
// #4 readerShouldBlock() = true 且没有线程持有「写锁」 // Make sure we're not acquiring read lock reentrantly // 重入获得读锁 // firstReader 线程读锁重入 // 非公平模式下,readerShouldBlock 返回 true,说明阻塞队列不为空, if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { // 这一段代码目的是处理读锁重入。为了确保读锁重入操作能成功,而非 // 被重新放入阻塞队列中等待。 // 还需要注意对ThreadLocal变量的处理,有一个remove()操作 if (rh == null) { // 获取最近缓存的HoldCounter rh = cachedHoldCounter; // #5 如果 cachedHoldCounter 缓存的非当前线程或根本没有缓存 // 从ThreadLocal获取当前线程的HoldCounter并更新 if (rh == null || rh.tid != getThreadId(current)) { // 从当前线程的ThreadLocal获取HoldCounter对象 rh = readHolds.get(); // #6 如果 count=0,属于上一行代码初始化造成,执行 remove // 避免内存泄漏 if (rh.count == 0) readHolds.remove(); } } // #7 将当前线程放入阻塞队列中 if (rh.count == 0) return -1; }
} if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// #8 CAS 更新 if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) { // #9 CAS成功,且发现 state = 0,就把当前线程设置为 firstReader // 表示首个获取读锁的线程 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { // #10 将 cachedHoldCounter 更新为当前线程 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } // #11 返回 >0 的数,表示成功获得读锁 return 1;
释放读锁
```java // java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#unlock public void unlock() { sync.releaseShared(1); }
// java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
<a name="SO06d"></a>
### tryReleaseShared
1. 通过缓存来加速相关变量的读取操作。
1. CAS 更新 state 的值。
```java
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryReleaseShared
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// #1 缓存加速:当前线程是否为首个获得读锁的线程
// 直接更新缓存即可
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
// #2 当前线程非首个获得读锁的线程,更新"二级缓存" cachedHoldCounter
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
// cacheHoldCounter不是当前线程的缓存,从ThreadLocal中读取
rh = readHolds.get();
// #3 这里就是最新值
int count = rh.count;
if (count <= 1) {
// #4 表示已不再持有读锁,直接remove,避免内存泄漏
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
// #5 更新 count 值
--rh.count;
}
// #6 CAS 更新 state
for (;;) {
int c = getState();
// #7 相当于是 state 高 16 位 - 1
int nextc = c - SHARED_UNIT;
// #8 CAS 更新
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
// #9 true:读/写锁都没有线程占用,主要是帮助后继线程唤醒获取「写锁」的线程
// 防止写饥饿
return nextc == 0;
}
}
获取写锁
我们知道,当一个线程拥有写锁时,其它线程不能拥有读锁或写锁。它是一把独占锁。
如果有其它线程持有读锁或写锁,那么当前线程需要进入阻塞队列等待前驱节点唤醒。
// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#lock
public void lock() {
sync.acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
// 如果获取独占锁失败,则线程被包装为Node对象并插入阻塞队列末尾
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire
- 如果 read count 不等于 0 或 write count 不等于,并且非重入线程,本次尝试获得锁失败。
- 如果计数已经饱和,本次尝试获取锁失败。
如果满足线程重入或满足入队策略,那么当前线程有资格获得锁。 ```java // java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquire protected final boolean tryAcquire(int acquires) { /*
- Walkthrough:
- If read count nonzero or write count nonzero
- and owner is a different thread, fail.
- If count would saturate, fail. (This can only
- happen if count is already nonzero.)
- Otherwise, this thread is eligible for lock if
- it is either a reentrant acquire or
- queue policy allows it. If so, update state
and set owner. */ // #1 获取当前线程 Thread current = Thread.currentThread();
// #2 获取当前AQS的state int c = getState();
// #3 获取写锁获取次数 int w = exclusiveCount(c);
// c == 0:意味着读锁和写锁都没被其它线程所持有 // c != 0 意味着至少有一个线程持有写锁或读锁 // c !=0 && w == 0 意味着写锁可用,但是有线程(也包括自己)持有「读锁」。 // c != 0 && w != 0 意味着写锁不可用,其它线程(也包括自己)持有「写锁」。 // 要判断当前是否可拥有锁,在前面状态的基础上,可能还需要判断是否当前线程是否重入 if (c != 0) { // #5 读锁被其它线程占用了,无法获得写锁 if (w == 0 || current != getExclusiveOwnerThread()) return false;
// #6 溢出 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error(“Maximum lock count exceeded”);
// #7 当前线程为重入线程,更新 state setState(c + acquires);
// #8 true:成功获得「写锁」 return true; }
// #9 c == 0,writerShouldBlock() 返回 false,就可以CAS更新 state 值 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false;
// #10 CAS 更新成功,当前线程成功获得「写锁」,设置独占线程 setExclusiveOwnerThread(current);
// #11 return true return true; }
// 返回当前独占锁的线程数量 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
<a name="hGKVJ"></a>
### writerShouldBlock
公平模式和非公平模式在这里是有区别的:
1. 非公平模式:直接返回 `false`。意味着当前线程直接去争抢**写锁**。
1. 公平模式:需要判断阻塞队列是否有其它线程正在排队。
```java
// java.util.concurrent.locks.ReentrantReadWriteLock.NonfairSync
// 非公平模式
final boolean writerShouldBlock() {
return false; // writers can always barge
}
// 公平模式
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
/**
* ① 空队列,返回 false
* ② 首个等待线程不是当前线程
*/
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
释放写锁
由于写锁是独占锁,因此,释放过程比较简单。当线程完全释放锁后,才会唤醒后驱节点。
// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#unlock
public void unlock() {
sync.release(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#release
public final boolean release(int arg) {
// #1 当线程完全释放写锁时,才会返回true
if (tryRelease(arg)) {
// #2 进入到这里,意味着当前线程完全释放写锁,那么可以将唤醒后驱节点
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryRelease
/**
* true:当前线程完全释放「写锁」
* false:当前线程并未完全释放「写锁」
*/
protected final boolean tryRelease(int releases) {
// #1 判断当前线程是否独占锁对象,如果不是,抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// #2 计算修改后的state值
int nextc = getState() - releases;
// #3 判断是否完全释放锁(可重入)
boolean free = exclusiveCount(nextc) == 0;
// #4 如果是的话,就将独占线程设置为null
if (free)
setExclusiveOwnerThread(null);
// #5 更新 state 的值
setState(nextc);
// #6 返回判断是否已完全释放锁
return free;
}
// c & 0..0 1...11111(15个1)
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
锁降级
在非公平模式下获取读锁,线程会给写锁做一些让步:判断首个等待线程是否获取写锁。如果是,则乖乖排队,不会争抢。
而锁降级是指将持有写锁的线程,去获取读锁的这一行为称为锁降级(Lock downgrading)。此刻,线程既拥有了写锁,又拥有了读锁。
但是,逆向是不可以发生了,即当前线程在拥有读锁的情况下,去获取写锁。这会造成死锁。
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// 看下这里返回 false 的情况:
// c != 0 && w == 0: 写锁可用,但是有线程持有读锁(也可能是自己持有)
// c != 0 && w !=0 && current != getExclusiveOwnerThread(): 其他线程持有写锁
// 也就是说,只要有读锁或写锁被占用,这次就不能获取到写锁
if (w == 0 || current != getExclusiveOwnerThread())
return false;
...
}
...
}
如果线程 t 先获得读锁,然后调用 tryAcquire()
尝试获得写锁,这个方法会返回 false。这就导致当前线程会加入阻塞队列并被阻塞,自己将自己休眠。但是由于写锁没有释放,其它线程无法获得读锁,也会加入阻塞队列。这就导致线程 t 永远无法被其它线程所唤醒。