前言
ReentrantReadWriteLock 表示可重入读写锁。读写锁的特点是读读之间不互斥,读写或写写之间是互斥的。
ReentrantReadWriteLock 实现了 ReadWriteLock 接口,只有 readLock 和 writeLock 这两个方法。锁的具体实现是由内部类 Sync 去完成,而 Sync 继承自 AbstractQueuedSynchronizer,所以 ReentrantReadWriteLock 底层实现基于 AQS。
除了 Sync 类之外,还有 ReadLock 和 WriteLock 这两个内部类,分别代表读锁和写锁。每个锁会包含 Sync 实例引用,在具体的方法上将读锁和写锁的行为区分开。
读写锁决策表
读写锁决策表 | 读 | 写 |
---|---|---|
读 | 是 | 否 |
写 | 否 | 否 |
ReentrantReadWriteLock 继承关系图
ReadLock
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock();
public void lockInterruptibly() throws InterruptedException;
public boolean tryLock();
public void unlock();
public Condition newCondition();
}
写锁
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock();
public void lockInterruptibly() throws InterruptedException;
public boolean tryLock( );
public void unlock();
public Condition newCondition();
public boolean isHeldByCurrentThread();
public int getHoldCount();
}
Sync 继承关系图
Sync
HoldCounter
Sync 类内部存在两个内部类,分别为 HoldCounter 和 ThreadLocalHoldCounter,其中 HoldCounter 与读锁配套使用。
它为每一个线程保留一个读锁计数器,存储于 ThreadLocal,放入变量 cachedHoldCounter 中。
下面是它的代码:
static final class HoldCounter {
int count; // 初始值 0
// 基本类型,避免垃圾回收
final long tid = LockSupport.getThreadId(Thread.currentThread());
}
count 记录读线程的数量,获取共享锁+1,释放共享锁-1。
tid 保存了当前线程的ID,该字段可以用来唯一标识一个线程。
ThreadLocalHoldCounter
它是 ThreadLocal 的子类,其初始值就是一个 HoldCounter 实例。
如果要将一个线程和对象绑定在一起只有 ThreadLocal 才能实现。
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
Sync 的成员变量
// 版本序列号
private static final long serialVersionUID = 6317671515068378041L;
// 高 16 位为读锁,低 16 位为写锁
static final int SHARED_SHIFT = 16;
// 读锁单位
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 读锁最大数量
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 写锁最大数量
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 当前线程重入读锁数量。当读锁的数量为 0 时,会被移除
private transient ThreadLocalHoldCounter readHolds;
// 缓存的计数器。记录最近一条线程获取到的读锁数量
private transient HoldCounter cachedHoldCounter;
// 第一个读线程
private transient Thread firstReader = null;
// 第一个读线程的计数
private transient int firstReaderHoldCount;
构造函数
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
读锁和写锁的数量
/** 读锁数量,将 state 无符号右移 16 位,得到读锁的线程数量,因为 state 的高 16 位表示读锁,对应的 16 位表示数量*/
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 写锁数量,将 state 和 (2^16 -1) 做与运算,其等效于将 state 摸上 2^16 。写锁数量由 state 的低 16 位表示*/
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
关于 ReentrantReadWriteLock 的 lock 方法
实例化 ReentrantReadWriteLock 后,其实调用了 Sync#acquire 相关方法。
// 构造函数
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 读锁 lock 方法
public void lock() {
sync.acquireShared(1);
}
// 构造函数
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 写锁 lock 方法
public void lock() {
sync.acquire(1);
}
tryAcquire 方法(尝试获取写锁)
如果获取锁成功,直接执行业务逻辑代码。
如果获取锁失败,则会加入 AQS 的双向队列中,并阻塞(如果允许阻塞的话),等待唤醒。
WriteLock#lock 方法实现的 Lock 接口,内部还是调用 AQS 的 acquire 方法请求锁:
public void lock() {
sync.acquire(1);
}
这个 acquire 方法是由 AQS 实现的,相当于一个模板方法。
如果第一次尝试获取锁失败,则创建一个独占锁节点并加入到 AQS 队列中。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire 流程图
tryAcquire 方法则由子类 Sync 去实现:
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
Thread current = Thread.currentThread();
// 获取状态
int c = getState();
// 写线程数量
int w = exclusiveCount(c);
// 状态不为0
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// 如果c!=0,而 w==0,说明有锁存在,但不是写锁,不允许获取独占锁
// 写线程数量为0或者当前线程没有占有独占资源
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 判断是否超过最高写线程数量
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 设置 AQS 状态
setState(c + acquires);
return true;
}
// 写线程是否应该被阻塞
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 设置独占线程
setExclusiveOwnerThread(current);
return true;
}
tryAcquire 的工作流程:
- 获取 state 状态,不为 0 说明有线程线程获取过锁。
- 判断写锁的数量是否为 0,为 0 或当前线程不是获得锁的线程返回 false;
- 如果写锁超过限制也返回 false;
- 上面都通过了则更新 state,返回 true;
如果没有线程获取锁,判断当前线程是否能被阻塞,如果被阻塞或更新状态失败,则不能获取到锁,反之则把当前线程作为独占锁的拥有者。
tryRelease 方法(尝试放弃写锁)
写锁在释放锁的时候更新了锁的重入次数。只有更新为 0 的时候真正释放,此时返回 true,并唤醒等待获取锁的线程。
unlock 方法:public void unlock() {
sync.release(1);
}
release 方法同理,也是 AQS 定义的模板方法,最后的逻辑还是由子类去实现。如果释放成功,则会唤醒下一个线程:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
释放锁的逻辑在子类里:
protected final boolean tryRelease(int releases) {
// 独占锁,必须是持有锁的线程去释放
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
// 如果 state 减去 释放的数量等于 0,说明写锁已全部释放。否则当前线程还在重入阶段
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
// 修改状态
setState(nextc);
return free;
}
tryAcquireShared 方法(尝试获取读锁)
先看看获取读锁的 lock 方法:
public void lock() {
// 调用AQS中的请求共享锁方法
sync.acquireShared(1);
}
实际上调用的是 AQS 的方法:
public final void acquireShared(int arg) {
// 尝试获取共享锁,如果获取失败则
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
尝试获取读锁方法,先简单获取一次,成功则返回,失败则进行完整的获取:
protected final int tryAcquireShared(int unused) {
// 获取当前线程
Thread current = Thread.currentThread();
// 获取锁状态
int c = getState();
// 如果独占锁数量不为0 并且 持有独占锁的线程不是当前线程,则直接失败返回
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1;
// 获取共享锁的数量
int r = sharedCount(c);
// 检查是否需要因为队列策略而阻塞,若不需要则检查共享锁数量是否达到最大值,都没有则CAS更新锁状态
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
// 共享锁数量为0表示当前线程是第一个获取读锁的线程
if (r == 0) {
// 更新第一个读线程变量和数量
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 如果首个获取读锁的线程重复获取读锁时,直接重入并将计数器累加
firstReaderHoldCount++;
} else {
// 获取当前线程的计数器
HoldCounter rh = cachedHoldCounter;
// 如果计数器为空 或者 当前线程还没有创建计数器,则创建计数器并存放到readHolds中,即存放到ThreadLocal中
if (rh == null || rh.tid != getThreadId(current))
// 在ThreadLocal中创建
cachedHoldCounter = rh = readHolds.get();
// 如果当前线程的计数器已存在,且计数值为0,则将该计数器放到readHolds中
else if (rh.count == 0)
readHolds.set(rh);
// 锁重入次数累加
rh.count++;
}
return 1;
}
// 之前因为队列策略或更新锁失败后再通过下面方法进行完整地尝试获取锁
return fullTryAcquireShared(current);
}
fullTryAcquireShared 方法
完整地获取读锁的方法,作为 tryAcquireShared 方法因 CAS 获取锁失败后的处理。
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
// 循环获取读锁
for (;;) {
int c = getState();
// 当前存在独占锁
if (exclusiveCount(c) != 0) {
// 并且非当前线程持有该独占锁,则直接返回-1
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// 确保我们没有重新获取读锁定
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
// 共享数量超过最大时抛出异常
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// CAS更新锁状态,以下逻辑与tryAcquireShared类似
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
doAcquireShared 方法
如果存在独占锁,则需要执行将当前线程加入到 AQS 的 Sync Queue 中去,自旋一段时间后仍未获取到锁,则进行阻塞处理。
// AbstractQueuedSynchronizer#doAcquireShared
private void doAcquireShared(int arg) {
// 将当前线程包装为类型为 Node.SHARED 的节点,标示这是一个共享节点。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 如果新建节点的前一个节点,就是 Head,说明当前节点是 Sync 队 列中等待获取锁的第一个节点,
// 按照 FIFO 的原则,可以直接尝试获取锁。
int r = tryAcquireShared(arg);
if (r >= 0) {
// 获取成功,需要将当前节点设置为 AQS 队列中的第一个节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这里就是与独占功能区别最大的一点,共享功能可以让其他线程获取到读锁,一旦获取到读锁,则会进行传播
setHeadAndPropagate 方法
这个方法很重要,当前节点的前继节点是首节点,那么就把当前节点作为 Sync Queue 中新首节点。然后将此状态传播出去。
//AbstractQueuedSynchronizer
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 记录老节点,方便下面的检查
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 传播下去,唤醒下一个节点
if (s == null || s.isShared())
doReleaseShared();
}
}
doReleaseShared 方法
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 如果当前节点是 SIGNAL 意味着,它正在等待一个信号,
// 或者说,它在等待转换状态,因此做两件事,1 是重置 waitStatus 标志位,2 是重置成功后, 唤醒下一个节点。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 如果本身头节点的 waitStatus 是出于重置状态(waitStatus==0)的,将其设置为“传播”状态。
// 意味着需要将状态向后一个节点传播。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 防止新加入节点,更新了首节点
if (h == head) // loop if head changed
break;
}
}
tryReleaseShared 方法
先看看释放锁的方法:
public void unlock() {
sync.releaseShared(1);
}
实际上调用的是 AQS 的方法:
public final boolean releaseShared(int arg) {
// 尝试一次释放锁,成功则唤醒同步队列中的写锁
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
尝试释放锁的方法:
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 首个读线程直接更新特有的计数器即可
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
// 走到这里,说明只有一个线程获取到了读锁,且只获取过一次读锁
if (firstReaderHoldCount == 1)
firstReader = null;
// 重入锁
else
firstReaderHoldCount--;
} else {
// 非首读线程则需要更新它的重入次数减1
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
// 没有锁了将这个引用移除
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
// CAS方式更新锁状态
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
总结
- ReentrantReadWriteLock 内部包含读锁和写锁,同一时刻只能一个线程拥有写锁,可以多个线程拥有读锁;
- 读锁和写锁都是可重入的,但有线程获取读锁后,其它线程不能获取读锁,防止更新后,其它线程不知情的情况;
- 内部锁基于 AQS 实现,存在读锁或写锁时,线程加入 AQS 同步队列阻塞。而读请求只有存在写锁的情况才加入 AQS 同步队列,和上一条同理;
- 使用 ThreadLocal 的子类为每个线程存放锁的重入次数,写锁使用 state 的低位记录写锁重入次数。