解决线程安全问题使用 ReentrantLock 就可以,但是 ReentrantLock 是独占锁,某时只有一个线程可以获取该锁,而实际中会有写少读多的场景,显然 ReentrantLock 满足不了这个需求,所以 ReentrantReadWriteLock 应运而生。ReentrantReadWriteLock 采用读写分离的策略,允许多个线程可以同时获取读锁。
类图结构
为了了解 ReentrantReadWriteLock 的内部构造,我们先看下它的类图结构,如图 6-7 所示。
读写锁的内部维护了一个 ReadLock 和一个 WriteLock,它们依赖 Sync 实现具体功能。而 Sync 继承自 AQS,并且也提供了公平和非公平的实现。下面只介绍非公平的读写锁实现。我们知道 AQS 中只维护了一个 state 状态,而 ReentrantReadWriteLock 则需要维护读状态和写状态,一个 state 怎么表示写和读两种状态呢?ReentrantReadWriteLock 巧妙地使用 state 的高 16 位表示读状态,也就是获取到读锁的次数;使用低 16 位表示获取到写锁的线程的可重入次数。
static final int SHARED_SHIFT = 16;
//共享锁(读锁)状态单位值 65536
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
//共享锁线程最大个数 65535
static final int MAX_COUNT = (1 << SHARED_SHIFT) -1;
//排它锁(写锁)掩码,二进制 ,15 个 1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) -1;
/** 返回读锁线程数 */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 返回写锁可重入个数 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
其中 firstReader 用来记录第一个获取到读锁的线程,firstReaderHoldCount 则记录第一个获取到读锁的线程获取读锁的可重入次数。cachedHoldCounter 用来记录最后一个获取读锁的线程获取读锁的可重入次数。
static final class HoldCounter {
int count = 0;
//线程 id
final long tid = getThreadId(Thread.currentThread());
}
readHolds 是 ThreadLocal 变量,用来存放除去第一个获取读锁线程外的其他线程获取读锁的可重入次数。ThreadLocalHoldCounter 继承了 ThreadLocal,因而 initialValue 方法返回一个 HoldCounter 对象。
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
写锁的获取与释放
在 ReentrantReadWriteLock 中写锁使用 WriteLock 来实现。
1.void lock()
写锁是个独占锁,某时只有一个线程可以获取该锁。如果当前没有线程获取到读锁和写锁,则当前线程可以获取到写锁然后返回。如果当前已经有线程获取到读锁和写锁,则当前请求写锁的线程会被阻塞挂起。另外,写锁是可重入锁,如果当前线程已经获取了该锁,再次获取只是简单地把可重入次数加 1 后直接返回。
public void lock() {
sync.acquire(1);
}
public final void acquire(int arg) {
// sync 重写的 tryAcquire 方法
if (! tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
如以上代码所示,在 lock()内部调用了 AQS 的 acquire 方法,其中 tryAcquire 是 ReentrantReadWriteLock 内部的 sync 类重写的,代码如下。
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
//(1)c! =0 说明读锁或者写锁已经被某线程获取
if (c ! = 0) {
//(2)w=0 说明已经有线程获取了读锁,w! =0 并且当前线程不是写锁拥有者,则返回
// false
if (w == 0 || current ! = getExclusiveOwnerThread())
return false;
//(3)说明当前线程获取了写锁,判断可重入次数
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error(「Maximum lock count exceeded」);
//(4)设置可重入次数(1)
setState(c + acquires);
return true;
}
//(5)第一个写线程获取写锁
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
在代码(1)中,如果当前 AQS 状态值不为 0 则说明当前已经有线程获取到了读锁或者写锁。在代码(2)中,如果 w==0 说明状态值的低 16 位为 0,而 AQS 状态值不为 0,则说明高 16 位不为 0,这暗示已经有线程获取了读锁,所以直接返回 false。
而如果 w!=0 则说明当前已经有线程获取了该写锁,再看当前线程是不是该锁的持有者,如果不是则返回 false。
执行到代码(3)说明当前线程之前已经获取到了该锁,所以判断该线程的可重入次数是不是超过了最大值,是则抛出异常,否则执行代码(4)增加当前线程的可重入次数,然后返回 true.
如果 AQS 的状态值等于 0 则说明目前没有线程获取到读锁和写锁,所以执行代码(5)。其中,对于 writerShouldBlock 方法,非公平锁的实现为
final boolean writerShouldBlock() {
return false; // writers can always barge
}
如果代码对于非公平锁来说总是返回 false,则说明代码(5)抢占式执行 CAS 尝试获取写锁,获取成功则设置当前锁的持有者为当前线程并返回 true,否则返回 false。
公平锁的实现为
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
这里还是使用 hasQueuedPredecessors 来判断当前线程节点是否有前驱节点,如果有则当前线程放弃获取写锁的权限,直接返回 false。
2.void lockInterruptibly()
类似于 lock()方法,它的不同之处在于,它会对中断进行响应,也就是当其他线程调用了该线程的 interrupt()方法中断了当前线程时,当前线程会抛出异常 InterruptedException 异常。
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
3.boolean tryLock()
尝试获取写锁,如果当前没有其他线程持有写锁或者读锁,则当前线程获取写锁会成功,然后返回 true。如果当前已经有其他线程持有写锁或者读锁则该方法直接返回 false,且当前线程并不会被阻塞。如果当前线程已经持有了该写锁则简单增加 AQS 的状态值后直接返回 true。
public boolean tryLock( ) {
return sync.tryWriteLock();
}
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c ! = 0) {
int w = exclusiveCount(c);
if (w == 0 || current ! = getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (! compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
如上代码与 tryAcquire 方法类似,这里不再讲述,不同在于这里使用的是非公平策略。
4.boolean tryLock(long timeout, TimeUnit unit)
与 tryAcquire()的不同之处在于,多了超时时间参数,如果尝试获取写锁失败则会把当前线程挂起指定时间,待超时时间到后当前线程被激活,如果还是没有获取到写锁则返回 false。另外,该方法会对中断进行响应,也就是当其他线程调用了该线程的 interrupt()方法中断了当前线程时,当前线程会抛出 InterruptedException 异常。
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
5.void unlock()
尝试释放锁,如果当前线程持有该锁,调用该方法会让该线程对该线程持有的 AQS 状态值减 1,如果减去 1 后当前状态值为 0 则当前线程会释放该锁,否则仅仅减 1 而已。如果当前线程没有持有该锁而调用了该方法则会抛出 IllegalMonitorStateException 异常,代码如下。
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
//调用 ReentrantReadWriteLock 中 sync 实现的 tryRelease 方法
if (tryRelease(arg)) {
//激活阻塞队列里面的一个线程
Node h = head;
if (h ! = null && h.waitStatus ! = 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
//(6)看是否是写锁拥有者调用的 unlock
if (! isHeldExclusively())
throw new IllegalMonitorStateException();
//(7)获取可重入值,这里没有考虑高 16 位,因为获取写锁时读锁状态值肯定为 0
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
//(8)如果写锁可重入值为 0 则释放锁,否则只是简单地更新状态值
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
在如上代码中,tryRelease 首先通过 isHeldExclusively 判断是否当前线程是该写锁的持有者,如果不是则抛出异常,否则执行代码(7),这说明当前线程持有写锁,持有写锁说明状态值的高 16 位为 0,所以这里 nextc 值就是当前线程写锁的剩余可重入次数。代码(8)判断当前可重入次数是否为 0,如果 free 为 true 则说明可重入次数为 0,所以当前线程会释放写锁,将当前锁的持有者设置为 null。如果 free 为 false 则简单地更新可重入次数。
读锁的获取与释放
ReentrantReadWriteLock 中的读锁是使用 ReadLock 来实现的。
1.void lock()
获取读锁,如果当前没有其他线程持有写锁,则当前线程可以获取读锁,AQS 的状态值 state 的高 16 位的值会增加 1,然后方法返回。否则如果其他一个线程持有写锁,则当前线程会被阻塞。
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
//调用 ReentrantReadWriteLock 中的 sync 的 tryAcquireShared 方法
if (tryAcquireShared(arg) < 0)
//调用 AQS 的 doAcquireShared 方法
doAcquireShared(arg);
}
在如上代码中,读锁的 lock 方法调用了 AQS 的 acquireShared 方法,在其内部调用了 ReentrantReadWriteLock 中的 sync 重写的 tryAcquireShared 方法,代码如下。
protected final int tryAcquireShared(int unused) {
//(1)获取当前状态值
Thread current = Thread.currentThread();
int c = getState();
//(2)判断是否写锁被占用
if (exclusiveCount(c) ! = 0 &&
getExclusiveOwnerThread() ! = current)
return -1;
//(3)获取读锁计数
int r = sharedCount(c);
//(4)尝试获取锁,多个读线程只有一个会成功,不成功的进入 fullTryAcquireShared 进行重试
if (! readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//(5)第一个线程获取读锁
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
//(6)如果当前线程是第一个获取读锁的线程
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
//(7)记录最后一个获取读锁的线程或记录其他线程读锁的可重入数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid ! = current.getId())
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//(8)类似 tryAcquireShared,但是是自旋获取
return fullTryAcquireShared(current);
}
如上代码首先获取了当前 AQS 的状态值,然后代码(2)查看是否有其他线程获取到了写锁,如果是则直接返回-1,而后调用 AQS 的 doAcquireShared 方法把当前线程放入 AQS 阻塞队列。
如果当前要获取读锁的线程已经持有了写锁,则也可以获取读锁。但是需要注意,当一个线程先获取了写锁,然后获取了读锁处理事情完毕后,要记得把读锁和写锁都释放掉,不能只释放写锁。
否则执行代码(3),得到获取到的读锁的个数,到这里说明目前没有线程获取到写锁,但是可能有线程持有读锁,然后执行代码(4)。其中非公平锁的 readerShouldBlock 实现代码如下。
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) ! = null &&
(s = h.next) ! = null &&
!s.isShared() &&
s.thread ! = null;
}
如上代码的作用是,如果队列里面存在一个元素,则判断第一个元素是不是正在尝试获取写锁,如果不是,则当前线程判断当前获取读锁的线程是否达到了最大值。最后执行 CAS 操作将 AQS 状态值的高 16 位值增加 1。
代码(5)(6)记录第一个获取读锁的线程并统计该线程获取读锁的可重入数。代码(7)使用 cachedHoldCounter 记录最后一个获取到读锁的线程和该线程获取读锁的可重入数,readHolds 记录了当前线程获取读锁的可重入数。
如果 readerShouldBlock 返回 true 则说明有线程正在获取写锁,所以执行代码(8)。fullTryAcquireShared 的代码与 tryAcquireShared 类似,它们的不同之处在于,前者通过循环自旋获取。
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (; ; ) {
int c = getState();
if (exclusiveCount(c) ! = 0) {
if (getExclusiveOwnerThread() ! = current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid ! = getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid ! = getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
2.void lockInterruptibly()
类似于 lock()方法,不同之处在于,该方法会对中断进行响应,也就是当其他线程调用了该线程的 interrupt()方法中断了当前线程时,当前线程会抛出 InterruptedException 异常。
3.boolean tryLock()
尝试获取读锁,如果当前没有其他线程持有写锁,则当前线程获取读锁会成功,然后返回 true。如果当前已经有其他线程持有写锁则该方法直接返回 false,但当前线程并不会被阻塞。如果当前线程已经持有了该读锁则简单增加 AQS 的状态值高 16 位后直接返回 true。其代码类似 tryLock 的代码,这里不再讲述。
4.boolean tryLock(long timeout, TimeUnit unit)
与 tryLock()的不同之处在于,多了超时时间参数,如果尝试获取读锁失败则会把当前线程挂起指定时间,待超时时间到后当前线程被激活,如果此时还没有获取到读锁则返回 false。另外,该方法对中断响应,也就是当其他线程调用了该线程的 interrupt()方法中断了当前线程时,当前线程会抛出 InterruptedException 异常。
5.void unlock()
public void unlock() {
sync.releaseShared(1);
}
如上代码具体释放锁的操作是委托给 Sync 类来做的,sync.releaseShared 方法的代码如下:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
其中 tryReleaseShared 的代码如下。
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
....
//循环直到自己的读计数-1, CAS 更新成功
for (; ; ) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
如以上代码所示,在无限循环里面,首先获取当前 AQS 状态值并将其保存到变量 c,然后变量 c 被减去一个读计数单位后使用 CAS 操作更新 AQS 状态值,如果更新成功则查看当前 AQS 状态值是否为 0,为 0 则说明当前已经没有读线程占用读锁,则 tryReleaseShared 返回 true。然后会调用 doReleaseShared 方法释放一个由于获取写锁而被阻塞的线程,如果当前 AQS 状态值不为 0,则说明当前还有其他线程持有了读锁,所以 tryReleaseShared 返回 false。如果 tryReleaseShared 中的 CAS 更新 AQS 状态值失败,则自旋重试直到成功。
案例介绍
上节介绍了如何使用 ReentrantLock 实现线程安全的 list,但是由于 ReentrantLock 是独占锁,所以在读多写少的情况下性能很差。下面使用 ReentrantReadWriteLock 来改造它,代码如下。
public static class ReentrantLockList {
//线程不安全的 list
private ArrayList<String> array = new ArrayList<String>();
//独占锁
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
//添加元素
public void add(String e) {
writeLock.lock();
try {
array.add(e);
} finally {
writeLock.unlock();
}
}
//删除元素
public void remove(String e) {
writeLock.lock();
try {
array.remove(e);
} finally {
writeLock.unlock();
}
}
//获取数据
public String get(int index) {
readLock.lock();
try {
return array.get(index);
} finally {
readLock.unlock();
}
}
}
以上代码调用 get 方法时使用的是读锁,这样运行多个读线程来同时访问 list 的元素,这在读多写少的情况下性能会更好。
最后使用一张图(见图 6-8)来加深对 ReentrantReadWriteLock 的理解。
小结
本节介绍了读写锁 ReentrantReadWriteLock 的原理,它的底层是使用 AQS 实现的。ReentrantReadWriteLock 巧妙地使用 AQS 的状态值的高 16 位表示获取到读锁的个数,低 16 位表示获取写锁的线程的可重入次数,并通过 CAS 对其进行操作实现了读写分离,这在读多写少的场景下比较适用。