读写锁:读锁和写锁都会发生死锁。
读和读之间允许共享 ,读和写之间独占,写和写之间独占。
一,读写锁的使用
public class Demo7 {public static void main(String[] args)throws Exception {MyCache myCache=new MyCache();for (int i = 1; i <=5; i++) {int num = i ;new Thread(()->{myCache.put(String.valueOf(num),String.valueOf(num));},String.valueOf(i)).start();}TimeUnit.SECONDS.sleep(3);for (int i = 1; i <=5; i++) {int num = i ;new Thread(()->{myCache.get(String.valueOf(num));},String.valueOf(i)).start();}}}class MyCache{//volatile:表示经常变化的private volatile Map<String,Object> map=new HashMap<>();private ReadWriteLock lock=new ReentrantReadWriteLock();public void put(String key,Object val){try {lock.writeLock().lock();System.out.println(Thread.currentThread().getName()+"\t 开始写入数据"+key+"!!!!!!!!!");TimeUnit.SECONDS.sleep(1);map.put(key,val);System.out.println(Thread.currentThread().getName()+"\t 完成写入数据"+key+"----------");} catch (InterruptedException e) {e.printStackTrace();}finally {lock.writeLock().unlock();}}public void get(String key){try {lock.readLock().lock();System.out.println(Thread.currentThread().getName()+"\t 开始读取数据"+key+"!!!!!!!!!");TimeUnit.SECONDS.sleep(1);Object result = map.get(key);System.out.println(Thread.currentThread().getName()+"\t 完成读取数据"+result+"----------");} catch (InterruptedException e) {e.printStackTrace();}finally {lock.readLock().unlock();}}}
二,读写锁的原理
读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个。
1.线程1写锁加锁
线程1成功上锁,流程与ReentrantLock加锁相比没有特殊之处,不同的是写锁状态占了state的低16位,而读锁使用的是state的高16位。

2.线程2读锁加锁
t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。如果有写锁占据,那么 tryAcquireShared 返回 -1 表示失败。
tryAcquireShared 返回值表示: -1 表示失败 0 表示成功,但后继节点不会继续唤醒 正数表示成功,而且数值是还有几个后继节点需要唤醒,读写锁返回 1。

这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点。

t2 会看看自己的节点是不是老二,如果是,还会再次调用 tryAcquireShared(1) 来尝试获取锁。
如果没有成功,在 doAcquireShared 内 for (; ; ) 循环一次,把前驱节点的 waitStatus 改为 -1,再 for (;; ) 循环一次尝试 tryAcquireShared(1) 如果还不成功,那么在 parkAndCheckInterrupt() 处 park。

3.线程3读锁加锁,线程4写锁加锁
这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子。

4.线程1写锁释放锁
这时会走到写锁的 sync.release(1) 流程,调用 sync.tryRelease(1) 成功,变成下面的样子

接下来执行唤醒流程 sync.unparkSuccessor,即让老二恢复运行,这时 t2 在 doAcquireShared 内parkAndCheckInterrupt() 处恢复运行。
这回再来一次 for (;;) 执行 tryAcquireShared 成功则让读锁计数加一。

这时 t2 已经恢复运行,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点。

事情还没完,在 setHeadAndPropagate 方法内还会检查下一个节点是否是 shared,如果是则调用doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒老二,这时 t3 在 doAcquireShared 内parkAndCheckInterrupt() 处恢复运行。

这回再来一次 for (;;) 执行 tryAcquireShared 成功则让读锁计数加一。

这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点。

下一个节点不是 shared 了,因此不会继续唤醒 t4 所在节点。
5.线程2读锁释放锁,线程3读锁释放锁
t2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零。

t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二,即:

之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;;) 这次自己是老二,并且没有其他竞争,tryAcquire(1) 成功,修改头结点,流程结束。

三,源码分析
1.写锁上锁流程
static final class NonfairSync extends Sync {// ... 省略无关代码// 外部类 WriteLock 方法, 方便阅读, 放在此处public void lock() {sync.acquire(1);}// AQS 继承过来的方法, 方便阅读, 放在此处public final void acquire(int arg) {if (// 尝试获得写锁失败!tryAcquire(arg) &&// 将当前线程关联到一个 Node 对象上, 模式为独占模式// 进入 AQS 队列阻塞acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {selfInterrupt();}}// Sync 继承过来的方法, 方便阅读, 放在此处protected final boolean tryAcquire(int acquires) {// 获得低 16 位, 代表写锁的 state 计数Thread current = Thread.currentThread();int c = getState();int w = exclusiveCount(c);if (c != 0) {if (// c != 0 and w == 0 表示有读锁, 或者w == 0 ||// 如果 exclusiveOwnerThread 不是自己current != getExclusiveOwnerThread()) {// 获得锁失败return false;}// 写锁计数超过低 16 位, 报异常if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");// 写锁重入, 获得锁成功setState(c + acquires);return true;}if (// 判断写锁是否该阻塞, 或者writerShouldBlock() ||// 尝试更改计数失败!compareAndSetState(c, c + acquires)) {// 获得锁失败return false;}// 获得锁成功setExclusiveOwnerThread(current);return true;}// 非公平锁 writerShouldBlock 总是返回 false, 无需阻塞final boolean writerShouldBlock() {return false;}}
2.写锁释放锁的流程
static final class NonfairSync extends Sync {// ... 省略无关代码// WriteLock 方法, 方便阅读, 放在此处public void unlock() {sync.release(1);}// AQS 继承过来的方法, 方便阅读, 放在此处public final boolean release(int arg) {// 尝试释放写锁成功if (tryRelease(arg)) {// unpark AQS 中等待的线程Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}// Sync 继承过来的方法, 方便阅读, 放在此处protected final boolean tryRelease(int releases) {if (!isHeldExclusively())throw new IllegalMonitorStateException();int nextc = getState() - releases;// 因为可重入的原因, 写锁计数为 0, 才算释放成功boolean free = exclusiveCount(nextc) == 0;if (free) {setExclusiveOwnerThread(null);}setState(nextc);return free;}}
3.读锁上锁流程
static final class NonfairSync extends Sync {// ReadLock 方法, 方便阅读, 放在此处public void lock() {sync.acquireShared(1);}// AQS 继承过来的方法, 方便阅读, 放在此处public final void acquireShared(int arg) {// tryAcquireShared 返回负数, 表示获取读锁失败if (tryAcquireShared(arg) < 0) {doAcquireShared(arg);}}// Sync 继承过来的方法, 方便阅读, 放在此处protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();int c = getState();// 如果是其它线程持有写锁, 获取读锁失败if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current) {return -1;}int r = sharedCount(c);if (// 读锁不该阻塞(如果老二是写锁,读锁该阻塞), 并且!readerShouldBlock() &&// 小于读锁计数, 并且r < MAX_COUNT &&// 尝试增加计数成功compareAndSetState(c, c + SHARED_UNIT)) {// ... 省略不重要的代码return 1;}return fullTryAcquireShared(current);}// 非公平锁 readerShouldBlock 看 AQS 队列中第一个节点是否是写锁// true 则该阻塞, false 则不阻塞final boolean readerShouldBlock() {return apparentlyFirstQueuedIsExclusive();}// AQS 继承过来的方法, 方便阅读, 放在此处// 与 tryAcquireShared 功能类似, 但会不断尝试 for (;;) 获取读锁, 执行过程中无阻塞final int fullTryAcquireShared(Thread current) {HoldCounter rh = null;for (; ; ) {int c = getState();if (exclusiveCount(c) != 0) {if (getExclusiveOwnerThread() != current)return -1;} else if (readerShouldBlock()) {// ... 省略不重要的代码}if (sharedCount(c) == MAX_COUNT)throw new Error("Maximum lock count exceeded");if (compareAndSetState(c, c + SHARED_UNIT)) {// ... 省略不重要的代码return 1;}}}// AQS 继承过来的方法, 方便阅读, 放在此处private void doAcquireShared(int arg) {// 将当前线程关联到一个 Node 对象上, 模式为共享模式final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (; ; ) {final Node p = node.predecessor();if (p == head) {// 再一次尝试获取读锁int r = tryAcquireShared(arg);// 成功if (r >= 0) {// ㈠// r 表示可用资源数, 在这里总是 1 允许传播//(唤醒 AQS 中下一个 Share 节点)setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (// 是否在获取读锁失败时阻塞(前一个阶段 waitStatus == Node.SIGNAL)shouldParkAfterFailedAcquire(p, node) &&// park 当前线程parkAndCheckInterrupt()) {interrupted = true;}}} finally {if (failed)cancelAcquire(node);}}// ㈠ AQS 继承过来的方法, 方便阅读, 放在此处private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check below// 设置自己为 headsetHead(node);// propagate 表示有共享资源(例如共享读锁或信号量)// 原 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE// 现在 head waitStatus == Node.SIGNAL 或 Node.PROPAGATEif (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;// 如果是最后一个节点或者是等待共享读锁的节点if (s == null || s.isShared()) {// 进入 ㈡doReleaseShared();}}}// ㈡ AQS 继承过来的方法, 方便阅读, 放在此处private void doReleaseShared() {// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark// 如果 head.waitStatus == 0 ==> Node.PROPAGATE, 为了解决 bug, 见后面分析for (; ; ) {Node h = head;// 队列还有节点if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck cases// 下一个节点 unpark 如果成功获取读锁// 并且下下个节点还是 shared, 继续 doReleaseSharedunparkSuccessor(h);} else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}}
4.读锁释放锁的流程
static final class NonfairSync extends Sync {// ReadLock 方法, 方便阅读, 放在此处public void unlock() {sync.releaseShared(1);}// AQS 继承过来的方法, 方便阅读, 放在此处public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}// Sync 继承过来的方法, 方便阅读, 放在此处protected final boolean tryReleaseShared(int unused) {// ... 省略不重要的代码for (; ; ) {int c = getState();int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc)) {// 读锁的计数不会影响其它获取读锁线程, 但会影响其它获取写锁线程// 计数为 0 才是真正释放return nextc == 0;}}}// AQS 继承过来的方法, 方便阅读, 放在此处private void doReleaseShared() {// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark// 如果 head.waitStatus == 0 ==> Node.PROPAGATEfor (; ; ) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;// 如果有其它线程也在释放读锁,那么需要将 waitStatus 先改为 0// 防止 unparkSuccessor 被多次执行if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}// 如果已经是 0 了,改为 -3,用来解决传播性,见后文信号量 bug 分析else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}}
