1. 读写锁详解
现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。
针对这种场景,JAVA的并发包提供了读写锁ReentrantReadWriteLock,它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁,描述如下:
线程进入读锁的前提条件:
没有其他线程的写锁,
没有写请求或者有写请求,但调用线程和持有锁的线程是同一个。
线程进入写锁的前提条件:
没有其他线程的读锁
没有其他线程的写锁
而读写锁有以下三个重要的特性:
(1)公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
(2)重进入:读锁和写锁都支持线程重进入。
(3)锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。
2. 源码解读
2.1 整体结构
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {/** 读锁 */private final ReentrantReadWriteLock.ReadLock readerLock;/** 写锁 */private final ReentrantReadWriteLock.WriteLock writerLock;final Sync sync;/** 使用默认(非公平)的排序属性创建一个新的 ReentrantReadWriteLock */public ReentrantReadWriteLock() {this(false);}/** 使用给定的公平策略创建一个新的 ReentrantReadWriteLock */public ReentrantReadWriteLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();readerLock = new ReadLock(this);writerLock = new WriteLock(this);}/** 返回用于写入操作的锁 */public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }/** 返回用于读取操作的锁 */public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }abstract static class Sync extends AbstractQueuedSynchronizer {}static final class NonfairSync extends Sync {}static final class FairSync extends Sync {}public static class ReadLock implements Lock, java.io.Serializable {}public static class WriteLock implements Lock, java.io.Serializable {}}
2.2 写锁的获取与释放
看下WriteLock类中的lock和unlock方法:
public void lock() {sync.acquire(1);}public void unlock() {sync.release(1);}
2.2.1 写锁的获取
protected final boolean tryAcquire(int acquires) {//当前线程Thread current = Thread.currentThread();//获取状态int c = getState();//写线程数量(即获取独占锁的重入数)int w = exclusiveCount(c);//当前同步状态state != 0,说明已经有其他线程获取了读锁或写锁if (c != 0) {// 当前state不为0,此时:如果写锁状态为0说明读锁此时被占用返回false;// 如果写锁状态不为0且写锁没有被当前线程持有返回falseif (w == 0 || current != getExclusiveOwnerThread())return false;//判断同一线程获取写锁是否超过最大次数(65535),支持可重入if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");//更新状态//此时当前线程已持有写锁,现在是重入,所以只需要修改锁的数量即可。setState(c + acquires);return true;}//到这里说明此时c=0,读锁和写锁都没有被获取//writerShouldBlock表示是否阻塞if (writerShouldBlock() ||!compareAndSetState(c, c + acquires))return false;//设置锁为当前线程所有setExclusiveOwnerThread(current);return true;}
从源代码可以看出,获取写锁的步骤如下:
(1)首先获取c、w。c表示当前锁状态;w表示写线程数量。然后判断同步状态state是否为0。如果state!=0,说明已经有其他线程获取了读锁或写锁,执行(2);否则执行(5)。
(2)如果锁状态不为零(c != 0),而写锁的状态为0(w = 0),说明读锁此时被其他线程占用,所以当前线程不能获取写锁,自然返回false。或者锁状态不为零,而写锁的状态也不为0,但是获取写锁的线程不是当前线程,则当前线程也不能获取写锁。
(3)判断当前线程获取写锁是否超过最大次数,若超过,抛异常,反之更新同步状态(此时当前线程已获取写锁,更新是线程安全的),返回true。
(4)如果state为0,此时读锁或写锁都没有被获取,判断是否需要阻塞(公平和非公平方式实现不同),在非公平策略下总是不会被阻塞,在公平策略下会进行判断(判断同步队列中是否有等待时间更长的线程,若存在,则需要被阻塞,否则,无需阻塞),如果不需要阻塞,则CAS更新同步状态,若CAS成功则返回true,失败则说明锁被别的线程抢去了,返回false。如果需要阻塞则也返回false。
(5)成功获取写锁后,将当前线程设置为占有写锁的线程,返回true。
方法流程如下:
2.2.2 写锁的释放
protected final boolean tryRelease(int releases) {//若锁的持有者不是当前线程,抛出异常if (!isHeldExclusively())throw new IllegalMonitorStateException();//写锁的新线程数int nextc = getState() - releases;//如果独占模式重入数为0了,说明独占模式被释放boolean free = exclusiveCount(nextc) == 0;if (free)//若写锁的新线程数为0,则将锁的持有者设置为nullsetExclusiveOwnerThread(null);//设置写锁的新线程数//不管独占模式是否被释放,更新独占重入数setState(nextc);return free;}
2.3 读锁的获取和释放
2.3.1 读锁的获取
protected final int tryAcquireShared(int unused) {// 获取当前线程Thread current = Thread.currentThread();// 获取状态int c = getState();//如果写锁线程数 != 0 ,且独占锁不是当前线程则返回失败,因为存在锁降级if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;// 读锁数量int r = sharedCount(c);/** readerShouldBlock():读锁是否需要等待(公平锁原则)* r < MAX_COUNT:持有线程小于最大数(65535)* compareAndSetState(c, c + SHARED_UNIT):设置读取锁状态*/// 读线程是否应该被阻塞、并且小于最大值、并且比较设置成功if (!readerShouldBlock() &&r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) {//r == 0,表示第一个读锁线程,第一个读锁firstRead是不会加入到readHolds中if (r == 0) { // 读锁数量为0// 设置第一个读线程firstReader = current;// 读线程占用的资源数为1firstReaderHoldCount = 1;} else if (firstReader == current) { // 当前线程为第一个读线程,表示第一个读锁线程重入// 占用资源数加1firstReaderHoldCount++;} else { // 读锁数量不为0并且不为当前线程// 获取计数器HoldCounter rh = cachedHoldCounter;// 计数器为空或者计数器的tid不为当前正在运行的线程的tidif (rh == null || rh.tid != getThreadId(current))// 获取当前线程对应的计数器cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0) // 计数为0//加入到readHolds中readHolds.set(rh);//计数+1rh.count++;}return 1;}return fullTryAcquireShared(current);}

fullTryAcquireShared方法:
final int fullTryAcquireShared(Thread current) {HoldCounter rh = null;for (;;) { // 无限循环// 获取状态int c = getState();if (exclusiveCount(c) != 0) { // 写线程数量不为0if (getExclusiveOwnerThread() != current) // 不为当前线程return -1;} else if (readerShouldBlock()) { // 写线程数量为0并且读线程被阻塞// Make sure we're not acquiring read lock reentrantlyif (firstReader == current) { // 当前线程为第一个读线程// assert firstReaderHoldCount > 0;} else { // 当前线程不为第一个读线程if (rh == null) { // 计数器不为空//rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current)) { // 计数器为空或者计数器的tid不为当前正在运行的线程的tidrh = readHolds.get();if (rh.count == 0)readHolds.remove();}}if (rh.count == 0)return -1;}}if (sharedCount(c) == MAX_COUNT) // 读锁数量为最大值,抛出异常throw new Error("Maximum lock count exceeded");if (compareAndSetState(c, c + SHARED_UNIT)) { // 比较并且设置成功if (sharedCount(c) == 0) { // 读线程数量为0// 设置第一个读线程firstReader = current;//firstReaderHoldCount = 1;} else if (firstReader == current) {firstReaderHoldCount++;} else {if (rh == null)rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;cachedHoldCounter = rh; // cache for release}return 1;}}}
在tryAcquireShared函数中,如果下列三个条件不满足(读线程是否应该被阻塞、小于最大值、比较设置成功)则会进行fullTryAcquireShared函数中,它用来保证相关操作可以成功。其逻辑与tryAcquireShared逻辑类似,不再累赘。
2.3.2 读锁的释放
protected final boolean tryReleaseShared(int unused) {// 获取当前线程Thread current = Thread.currentThread();if (firstReader == current) { // 当前线程为第一个读线程// assert firstReaderHoldCount > 0;if (firstReaderHoldCount == 1) // 读线程占用的资源数为1firstReader = null;else // 减少占用的资源firstReaderHoldCount--;} else { // 当前线程不为第一个读线程// 获取缓存的计数器HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current)) // 计数器为空或者计数器的tid不为当前正在运行的线程的tid// 获取当前线程对应的计数器rh = readHolds.get();// 获取计数int count = rh.count;if (count <= 1) { // 计数小于等于1// 移除readHolds.remove();if (count <= 0) // 计数小于等于0,抛出异常throw unmatchedUnlockException();}// 减少计数--rh.count;}for (;;) { // 无限循环// 获取状态int c = getState();// 获取状态int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc)) // 比较并进行设置// Releasing the read lock has no effect on readers,// but it may allow waiting writers to proceed if// both read and write locks are now free.return nextc == 0;}}
此方法表示读锁线程释放锁。首先判断当前线程是否为第一个读线程firstReader,若是,则判断第一个读线程占有的资源数firstReaderHoldCount是否为1,若是,则设置第一个读线程firstReader为空,否则,将第一个读线程占有的资源数firstReaderHoldCount减1;若当前线程不是第一个读线程,那么首先会获取缓存计数器(上一个读锁线程对应的计数器 ),若计数器为空或者tid不等于当前线程的tid值,则获取当前线程的计数器,如果计数器的计数count小于等于1,则移除当前线程对应的计数器,如果计数器的计数count小于等于0,则抛出异常,之后再减少计数即可。无论何种情况,都会进入无限循环,该循环可以确保成功设置状态state。
在读锁的获取、释放过程中,总是会有一个对象存在着,同时该对象在获取线程获取读锁是+1,释放读锁时-1,该对象就是HoldCounter。
要明白HoldCounter就要先明白读锁。前面提过读锁的内在实现机制就是共享锁,对于共享锁其实我们可以稍微的认为它不是一个锁的概念,它更加像一个计数器的概念。一次共享锁操作就相当于一次计数器的操作,获取共享锁计数器+1,释放共享锁计数器-1。只有当线程获取共享锁后才能对共享锁进行释放、重入操作。所以HoldCounter的作用就是当前线程持有共享锁的数量,这个数量必须要与线程绑定在一起,否则操作其他线程锁就会抛出异常。
3. 总结
通过上面的源码分析,我们可以发现一个现象:
在线程持有读锁的情况下,该线程不能取得写锁(因为获取写锁的时候,如果发现当前的读锁被占用,就马上获取失败,不管读锁是不是被当前线程持有)。
在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)。
仔细想想,这个设计是合理的:因为当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁;而对于获得写锁的线程,它一定独占了读写锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释放写锁继续持有读锁,这样一个写锁就“降级”为了读锁。
综上:
一个线程要想同时持有写锁和读锁,必须先获取写锁再获取读锁;写锁可以“降级”为读锁;读锁不能“升级”为写锁。
