读写锁接口:
public interface ReadWriteLock {Lock readLock();//读锁Lock writeLock();//写锁}
构造器
public ReentrantReadWriteLock() {this(false);}public ReentrantReadWriteLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();readerLock = new ReadLock(this);writerLock = new WriteLock(this);}
构造器可以得知,读写锁也分为公平锁和非公平锁
读锁和写锁都实现了Lock接口
并持有同个AQS同步器
读锁的实现
public static class ReadLock implements Lock, java.io.Serializable {private static final long serialVersionUID = -5992448646407690164L;private final Sync sync;protected ReadLock(ReentrantReadWriteLock lock) {sync = lock.sync;}....}
获取读锁
public void lock() {sync.acquireShared(1);//获取共享锁}
读锁获取的是个共享锁
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}
同步器共享方法中,返回>=0获取成功,<0获取失败
protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();//获取当前线程int c = getState();//获取stateif (exclusiveCount(c) != 0 &&//独占计数!=0,说明有线程占用读锁getExclusiveOwnerThread() != current)//这个线程非当前线程return -1;//返回-1 抢占共享锁失败int r = sharedCount(c);//获取共享计数if (!readerShouldBlock() &&//下个aqs下个被唤醒的不是写线程r < MAX_COUNT &&//允许最大的读线程compareAndSetState(c, c + SHARED_UNIT)) {//cas占有读锁if (r == 0) {//第一个占用锁的线程firstReader = current;//第一读的线程firstReaderHoldCount = 1;//第一个读的线程保持计数} else if (firstReader == current) {//第一个读的线程重入firstReaderHoldCount++;//计数++} else {HoldCounter rh = cachedHoldCounter;//获取最后一个读锁的线程计数器if (rh == null || rh.tid != getThreadId(current))//缓存为空或者 如果最后一个线程计数器不是当前线程cachedHoldCounter = rh = readHolds.get();//创建新的线程计数器else if (rh.count == 0)//缓存计数器无值readHolds.set(rh);rh.count++;}return 1;}//cas失败,继续获取return fullTryAcquireShared(current);}
exclusiveCount(c)判断独占锁
static final int SHARED_SHIFT = 16;static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
读写锁的状态都维护在state上。这是种按位切割使用,将变量切分成两部分,高16位表示读,低16位表示写。

当前同步状态表示一个线程已经获取了写锁,且重入两次,同时也连续获取了两次读锁。
如果没有线程占有独占锁,那么线程会判断读是否应该阻塞当前线程
readerShouldBlock()
非公平锁实现
final boolean readerShouldBlock() {return apparentlyFirstQueuedIsExclusive();//队列第一是是否是独占}final boolean apparentlyFirstQueuedIsExclusive() {Node h, s;return (h = head) != null &&//获取队列头 不为空(s = h.next) != null &&//头的next节点不为空,及排在队列的一个节点!s.isShared() &&//状态不是共享状态。s.thread != null; //有线程}
非公平锁判断aqs排在第一的线程是写锁的的话则进行阻塞
公平锁实现
final boolean readerShouldBlock() {return hasQueuedPredecessors();}
public final boolean hasQueuedPredecessors() {Node t = tail; // Read fields in reverse initialization order 以相反的初始化顺序读取字段Node h = head;Node s;return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());}
公平锁只要是aqs里有值就直接阻塞,让队列里的线程先访问资源
如果判断不需要阻塞,则进行一次cas,stete高位+1.成功的话获得锁,并记录是否是firstRead,或者holdCounter
需要阻塞或者cas失败的话,进入fullTryAcquireShared(current)
fullTryAcquireShared(current)自旋获取锁
final int fullTryAcquireShared(Thread current) {HoldCounter rh = null;for (;;) {//通过自旋方式int c = getState();if (exclusiveCount(c) != 0) {//独占锁if (getExclusiveOwnerThread() != current)//非当前线程return -1;//获取失败// else we hold the exclusive lock; blocking here// would cause deadlock.} else if (readerShouldBlock()) {//读被阻塞。aqs队列排在第一的线程为写线程// Make sure we're not acquiring read lock reentrantlyif (firstReader == current) {//当前线程是第一个获取写的线程// assert firstReaderHoldCount > 0;} else {if (rh == null) {rh = cachedHoldCounter;//代表的是最后一个获取读锁的线程的计数器。if (rh == null || rh.tid != getThreadId(current)) {//如果最后一个线程计数器是 null 或者不是当前线程rh = readHolds.get();if (rh.count == 0)readHolds.remove();}}if (rh.count == 0)return -1;}}if (sharedCount(c) == MAX_COUNT)//读锁满了throw new Error("Maximum lock count exceeded");//抛出异常if (compareAndSetState(c, c + SHARED_UNIT)) {//cas去获取线程if (sharedCount(c) == 0) {//第一个获取锁的线程firstReader = current;//记录第一线程firstReaderHoldCount = 1;} else if (firstReader == current) {firstReaderHoldCount++;} else {if (rh == null)rh = cachedHoldCounter;//最后一个线程计数器if (rh == null || rh.tid != getThreadId(current))//为空或者不是当前线程。rh = readHolds.get();//创建一个else if (rh.count == 0)readHolds.set(rh);rh.count++;//计算+1cachedHoldCounter = rh; // cache for release}return 1;}}}
自旋直到,写锁获取锁,或者需要被阻塞,或者获取当前线程获取锁,或者锁满。
doAcquireShared(arg)加入aqs等待队列,挂起
private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);//传递唤醒p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
挂起的逻辑与独占锁的逻辑一致,被唤醒后,的差异是会传递唤醒下一个也是share状态的锁
setHeadAndPropagate(node, r)传递唤醒
private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);//当前被唤醒的线程设为headif (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();//传递唤醒线程}}
读锁释放
public void unlock() {sync.releaseShared(1);}
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {//释放共享锁doReleaseShared();//唤醒aqs队列中等待的队列return true;}return false;}
protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();if (firstReader == current) {//当前线程是头线程// assert firstReaderHoldCount > 0;if (firstReaderHoldCount == 1)firstReader = null;elsefirstReaderHoldCount--;} else {HoldCounter rh = cachedHoldCounter;//最后一个线程if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();int count = rh.count;if (count <= 1) {readHolds.remove();//移除if (count <= 0)throw unmatchedUnlockException();}--rh.count;}for (;;) {int c = getState();int nextc = c - SHARED_UNIT;//高位减1if (compareAndSetState(c, nextc))//cas去减statereturn nextc == 0;//等与0 的话,读锁释放}}
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);//唤醒排在第一的线程}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//PROPAGATE:-3表示下一次共享式同步状态获取将会无条件地被传播下去continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}
写锁的实现
获取写锁
public void lock() {sync.acquire(1);}
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();//再次中断}
protected final boolean tryAcquire(int acquires) {Thread current = Thread.currentThread();//获取当前线程int c = getState();int w = exclusiveCount(c);//独占线程次数if (c != 0) {//有线程占有锁// (Note: if c != 0 and w == 0 then shared count != 0)if (w == 0 || current != getExclusiveOwnerThread()) //不是独占锁,或者不是当前线程占用独占锁return false;//抢占失败if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");// Reentrant acquiresetState(c + acquires);//重入次数+1 ,由于此时该线程获取了锁,不需要casreturn true;}if (writerShouldBlock() || //判断写线程是否需要阻塞,非公平锁永远不阻塞,公平锁判断aqs队列里是否有值!compareAndSetState(c, c + acquires))//cas去获取锁return false;//阻塞,或者获取失败直接返回 falsesetExclusiveOwnerThread(current);//设置当前线程获取独占锁return true;
获取失败的话加入aqs队列,等待被唤醒,并判断是否需要恢复中断标识。该逻辑与重入锁的逻辑一致,这里就不再次详细描述
释放写锁
public void unlock() {sync.release(1);}
public final boolean release(int arg) {if (tryRelease(arg)) { // 释放锁Node h = head;if (h != null && h.waitStatus != 0) //有等待的线程unparkSuccessor(h);//唤醒头节点的线程return true;}return false;}
protected final boolean tryRelease(int releases) {if (!isHeldExclusively())//非独占锁,异常throw new IllegalMonitorStateException();int nextc = getState() - releases;boolean free = exclusiveCount(nextc) == 0;//等与0的时候释放,不等于0,减去重入次数if (free)setExclusiveOwnerThread(null);//释放锁的线程setState(nextc);return free;}
因为线程独占锁所以释放的时候不需要cas
释放完成后会唤醒aqs的排在第一的线程。该唤醒与重入锁逻辑一致
