1.类图结构


前提
读写锁内容维护了一个ReadLock和 一个WriteLock,他们依赖Sync实现具体的功能,而Sync继承自AQS,并且也提供了非公平和公平的实现在ReentrantReadWriteLock中使用高16位表示读锁,低16位表示写锁
核心参数
static final int SHARED_SHIFT = 16;//共享锁(读锁)的状态值 65536static final int SHARED_UNIT = (1 << SHARED_SHIFT);//共享线程的最大个数 65536static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;//排他锁(写锁)掩码,二进制 15个1static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;//返回读锁的线程数static int sharedCount(int c) { return c >>> SHARED_SHIFT; }//返回写锁的重入个数static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }//第一个获取到读锁的线程private transient Thread firstReader = null;//第一个获取到读锁的可重入次数private transient int firstReaderHoldCount;//记录最后一个获取锁的线程获取读锁的可重入次数private transient HoldCounter cachedHoldCounter;//ThreadLocal类型的变量,用来存储第一个获取读锁线程外其他线程获取读锁的可重入次数//继承自 ThreadLocal initialValue 方法返回一个 HoldCounter对象private transient ThreadLocalHoldCounter readHolds;
2.写锁的获取和释放
2.1 void lock()
写锁是一个独占锁,某个时间只有一个线程能够获取该锁,如果当前没有线程获取到读锁和写锁,则当前线程可以获取写锁然后返回如果已经有线程获取到读锁和写锁,则当前线程会被阻塞写锁是可重入锁,如果当前线程获取到写锁,再次获取时 可重入次数 +1 即可
代码实现
public void lock() {sync.acquire(1);}//和ReentrantLock一样,调用AQS中的acquire()方法public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
查看Sync中重写的 tryAquire()
protected final boolean tryAcquire(int acquires) {Thread current = Thread.currentThread();int c = getState();int w = exclusiveCount(c);//说明写锁或读锁已经被其他线程获取了if (c != 0) {// w == 0说明写锁没有被获取,但是 c != 0 可以得出 读锁被其他线程获取了// w != 0说明写锁被其他线程获取了,且当前线程不是当前线程//上面两种情况都返回 falseif (w == 0 || current != getExclusiveOwnerThread())return false;//能执行到这里说明是当前线程获取到了写锁,则计算可重入的次数,不能超过写锁的最大可重入次数if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");// 设置可重入的次数setState(c + acquires);return true;}//锁没有被人获取,则直接获取锁if (writerShouldBlock() ||!compareAndSetState(c, c + acquires))return false;setExclusiveOwnerThread(current);return true;}
2.2 void unlock()
public void unlock() {sync.release(1);}public final boolean release(int arg) {if (tryRelease(arg)) {//激活阻塞队列中的一个线程Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
Sync自定义的 tryRelease() 方法
protected final boolean tryRelease(int releases) {//确认当前线程是否是写锁的持有者if (!isHeldExclusively())throw new IllegalMonitorStateException();//计算可重入次数int nextc = getState() - releases;//如果可重入的次数为0则返回trueboolean free = exclusiveCount(nextc) == 0;if (free)//置空setExclusiveOwnerThread(null);//更新状态值setState(nextc);return free;}
3.读锁的获取和释放
3.1 void lock()
获取读锁,如果当前没有其他线程持有写锁,则当前线程可以获取读锁,AQS的状态值state的高16位会 + 1,然后返回 如果一个线程持有写锁,则当前线程会被阻塞
public void lock() {sync.acquireShared(1);}//调用AQS中的 acquireSharedpublic final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)//调用AQS中的 doAcquireShared 方法doAcquireShared(arg);}
然后Sync中自定义了 tryAcquireShared()方法
protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();int c = getState();//判断写锁是否被占用,如果写锁被占用且不是当前线程则返回 -1(false)//这里也就说明了一个线程既可以获取写锁,也可以获取读锁,但是释放的时候都要释放if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;//获取读锁计数int r = sharedCount(c);//尝试获取读锁,多个读线程只有一个会成功,不成功的会进入到 fullTryAcquireShared() 进行重试//readerShouldBlock() 在非公平的实现中,会去查看AQS队列中是否存在一个元素,判断第一个元素是否正在尝试获取写锁//如果不是,则判断获取读锁的线程是否达到了最大值//最后执行CAS操作将高16位的值 + 1if (!readerShouldBlock() &&r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) {//如果还没有线程获取过读锁//则记录当前线程位读锁的第一个获取线程if (r == 0) {firstReader = current;firstReaderHoldCount = 1;//如果当前线程是第一个获取读锁的线程,则可重入次数 +1} else if (firstReader == current) {firstReaderHoldCount++;} else {//记录最后一个获取读锁的线程或记录其他线程读锁的可重入次数HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;}return 1;}//自旋获取,类似于 tryAquireShared()return fullTryAcquireShared(current);}
3.2 void unlock()
public void unlock() {sync.releaseShared(1);}public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}
同样是Sync自定义的 tryReleaseShared()
protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();//如果当前线程是第一个获取读锁的线程if (firstReader == current) {//且重入的次数为1,则 firstReader = nullif (firstReaderHoldCount == 1)firstReader = null;else//否则 重入次数-1firstReaderHoldCount--;} else {HoldCounter rh = cachedHoldCounter;//如果当前线程不是最后一个获取读锁的线程if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();//则从中间 readHolds 统计的重入次数 -1int count = rh.count;if (count <= 1) {//如果 count = 1 ,则移除 HoldCounter 计数readHolds.remove();//如果 == 0则说明,无法进行 -1操作,抛出异常if (count <= 0)throw unmatchedUnlockException();}//计数-1--rh.count;}//自旋更新读锁的状态值for (;;) {int c = getState();//相当于高16位的值进行了 -1操作int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc))//当前线程释放读锁成功,但是可能还有其他线程持有读锁,所以要判断 nextc == 0//不为0则说明还有其他线程持有读锁,则读锁释放失败//为0则说明没有其他线程持有读锁了,读锁释放成功return nextc == 0;}}
4.案例
使用ReentrantReadWriteLock完成一个线程安全的List
import java.util.ArrayList;import java.util.List;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantReadWriteLock;public class ReentrantList {private List<String> seed = new ArrayList<>();private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();private final Lock readLock = lock.readLock();private final Lock writeLock = lock.writeLock();public void add(String e){writeLock.lock();try {seed.add(e);}finally {writeLock.unlock();}}public void remove(String e){writeLock.lock();try {seed.remove(e);}finally {writeLock.unlock();}}public String get(int index){readLock.lock();try {return seed.get(index);}finally {readLock.unlock();}}}
5.总结
ReentrantReadWriteLock的底层原理就是使用AQS实现的,ReentrantReadWriteLock使用高16位和低16位分别表示读锁和写锁的个数通过CAS对其进行操作实现了读写分离,主要在读多写少的场景下使用
