读写锁:读锁和写锁都会发生死锁。
读和读之间允许共享 ,读和写之间独占,写和写之间独占。

一,读写锁的使用

  1. public class Demo7 {
  2. public static void main(String[] args)throws Exception {
  3. MyCache myCache=new MyCache();
  4. for (int i = 1; i <=5; i++) {
  5. int num = i ;
  6. new Thread(()->{
  7. myCache.put(String.valueOf(num),String.valueOf(num));
  8. },String.valueOf(i)).start();
  9. }
  10. TimeUnit.SECONDS.sleep(3);
  11. for (int i = 1; i <=5; i++) {
  12. int num = i ;
  13. new Thread(()->{
  14. myCache.get(String.valueOf(num));
  15. },String.valueOf(i)).start();
  16. }
  17. }
  18. }
  19. class MyCache{
  20. //volatile:表示经常变化的
  21. private volatile Map<String,Object> map=new HashMap<>();
  22. private ReadWriteLock lock=new ReentrantReadWriteLock();
  23. public void put(String key,Object val){
  24. try {
  25. lock.writeLock().lock();
  26. System.out.println(Thread.currentThread().getName()+"\t 开始写入数据"+key+"!!!!!!!!!");
  27. TimeUnit.SECONDS.sleep(1);
  28. map.put(key,val);
  29. System.out.println(Thread.currentThread().getName()+"\t 完成写入数据"+key+"----------");
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }finally {
  33. lock.writeLock().unlock();
  34. }
  35. }
  36. public void get(String key){
  37. try {
  38. lock.readLock().lock();
  39. System.out.println(Thread.currentThread().getName()+"\t 开始读取数据"+key+"!!!!!!!!!");
  40. TimeUnit.SECONDS.sleep(1);
  41. Object result = map.get(key);
  42. System.out.println(Thread.currentThread().getName()+"\t 完成读取数据"+result+"----------");
  43. } catch (InterruptedException e) {
  44. e.printStackTrace();
  45. }finally {
  46. lock.readLock().unlock();
  47. }
  48. }
  49. }

二,读写锁的原理

读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个。

1.线程1写锁加锁

线程1成功上锁,流程与ReentrantLock加锁相比没有特殊之处,不同的是写锁状态占了state的低16位,而读锁使用的是state的高16位。

image.png

2.线程2读锁加锁

t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。如果有写锁占据,那么 tryAcquireShared 返回 -1 表示失败。

tryAcquireShared 返回值表示: -1 表示失败 0 表示成功,但后继节点不会继续唤醒 正数表示成功,而且数值是还有几个后继节点需要唤醒,读写锁返回 1。

image.png

这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点。

image.png

t2 会看看自己的节点是不是老二,如果是,还会再次调用 tryAcquireShared(1) 来尝试获取锁。

如果没有成功,在 doAcquireShared 内 for (; ; ) 循环一次,把前驱节点的 waitStatus 改为 -1,再 for (;; ) 循环一次尝试 tryAcquireShared(1) 如果还不成功,那么在 parkAndCheckInterrupt() 处 park。

image.png

3.线程3读锁加锁,线程4写锁加锁

这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子。

image.png

4.线程1写锁释放锁

这时会走到写锁的 sync.release(1) 流程,调用 sync.tryRelease(1) 成功,变成下面的样子

image.png

接下来执行唤醒流程 sync.unparkSuccessor,即让老二恢复运行,这时 t2 在 doAcquireShared 内parkAndCheckInterrupt() 处恢复运行。

这回再来一次 for (;;) 执行 tryAcquireShared 成功则让读锁计数加一。

image.png

这时 t2 已经恢复运行,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点。

image.png

事情还没完,在 setHeadAndPropagate 方法内还会检查下一个节点是否是 shared,如果是则调用doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒老二,这时 t3 在 doAcquireShared 内parkAndCheckInterrupt() 处恢复运行。

image.png

这回再来一次 for (;;) 执行 tryAcquireShared 成功则让读锁计数加一。

image.png

这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点。

image.png

下一个节点不是 shared 了,因此不会继续唤醒 t4 所在节点。

5.线程2读锁释放锁,线程3读锁释放锁

t2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零。

image.png

t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二,即:

image.png

之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;;) 这次自己是老二,并且没有其他竞争,tryAcquire(1) 成功,修改头结点,流程结束。

image.png

三,源码分析

1.写锁上锁流程

  1. static final class NonfairSync extends Sync {
  2. // ... 省略无关代码
  3. // 外部类 WriteLock 方法, 方便阅读, 放在此处
  4. public void lock() {
  5. sync.acquire(1);
  6. }
  7. // AQS 继承过来的方法, 方便阅读, 放在此处
  8. public final void acquire(int arg) {
  9. if (
  10. // 尝试获得写锁失败
  11. !tryAcquire(arg) &&
  12. // 将当前线程关联到一个 Node 对象上, 模式为独占模式
  13. // 进入 AQS 队列阻塞
  14. acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
  15. ) {
  16. selfInterrupt();
  17. }
  18. }
  19. // Sync 继承过来的方法, 方便阅读, 放在此处
  20. protected final boolean tryAcquire(int acquires) {
  21. // 获得低 16 位, 代表写锁的 state 计数
  22. Thread current = Thread.currentThread();
  23. int c = getState();
  24. int w = exclusiveCount(c);
  25. if (c != 0) {
  26. if (
  27. // c != 0 and w == 0 表示有读锁, 或者
  28. w == 0 ||
  29. // 如果 exclusiveOwnerThread 不是自己
  30. current != getExclusiveOwnerThread()
  31. ) {
  32. // 获得锁失败
  33. return false;
  34. }
  35. // 写锁计数超过低 16 位, 报异常
  36. if (w + exclusiveCount(acquires) > MAX_COUNT)
  37. throw new Error("Maximum lock count exceeded");
  38. // 写锁重入, 获得锁成功
  39. setState(c + acquires);
  40. return true;
  41. }
  42. if (
  43. // 判断写锁是否该阻塞, 或者
  44. writerShouldBlock() ||
  45. // 尝试更改计数失败
  46. !compareAndSetState(c, c + acquires)
  47. ) {
  48. // 获得锁失败
  49. return false;
  50. }
  51. // 获得锁成功
  52. setExclusiveOwnerThread(current);
  53. return true;
  54. }
  55. // 非公平锁 writerShouldBlock 总是返回 false, 无需阻塞
  56. final boolean writerShouldBlock() {
  57. return false;
  58. }
  59. }

2.写锁释放锁的流程

  1. static final class NonfairSync extends Sync {
  2. // ... 省略无关代码
  3. // WriteLock 方法, 方便阅读, 放在此处
  4. public void unlock() {
  5. sync.release(1);
  6. }
  7. // AQS 继承过来的方法, 方便阅读, 放在此处
  8. public final boolean release(int arg) {
  9. // 尝试释放写锁成功
  10. if (tryRelease(arg)) {
  11. // unpark AQS 中等待的线程
  12. Node h = head;
  13. if (h != null && h.waitStatus != 0)
  14. unparkSuccessor(h);
  15. return true;
  16. }
  17. return false;
  18. }
  19. // Sync 继承过来的方法, 方便阅读, 放在此处
  20. protected final boolean tryRelease(int releases) {
  21. if (!isHeldExclusively())
  22. throw new IllegalMonitorStateException();
  23. int nextc = getState() - releases;
  24. // 因为可重入的原因, 写锁计数为 0, 才算释放成功
  25. boolean free = exclusiveCount(nextc) == 0;
  26. if (free) {
  27. setExclusiveOwnerThread(null);
  28. }
  29. setState(nextc);
  30. return free;
  31. }
  32. }

3.读锁上锁流程

  1. static final class NonfairSync extends Sync {
  2. // ReadLock 方法, 方便阅读, 放在此处
  3. public void lock() {
  4. sync.acquireShared(1);
  5. }
  6. // AQS 继承过来的方法, 方便阅读, 放在此处
  7. public final void acquireShared(int arg) {
  8. // tryAcquireShared 返回负数, 表示获取读锁失败
  9. if (tryAcquireShared(arg) < 0) {
  10. doAcquireShared(arg);
  11. }
  12. }
  13. // Sync 继承过来的方法, 方便阅读, 放在此处
  14. protected final int tryAcquireShared(int unused) {
  15. Thread current = Thread.currentThread();
  16. int c = getState();
  17. // 如果是其它线程持有写锁, 获取读锁失败
  18. if (
  19. exclusiveCount(c) != 0 &&
  20. getExclusiveOwnerThread() != current
  21. ) {
  22. return -1;
  23. }
  24. int r = sharedCount(c);
  25. if (
  26. // 读锁不该阻塞(如果老二是写锁,读锁该阻塞), 并且
  27. !readerShouldBlock() &&
  28. // 小于读锁计数, 并且
  29. r < MAX_COUNT &&
  30. // 尝试增加计数成功
  31. compareAndSetState(c, c + SHARED_UNIT)
  32. ) {
  33. // ... 省略不重要的代码
  34. return 1;
  35. }
  36. return fullTryAcquireShared(current);
  37. }
  38. // 非公平锁 readerShouldBlock 看 AQS 队列中第一个节点是否是写锁
  39. // true 则该阻塞, false 则不阻塞
  40. final boolean readerShouldBlock() {
  41. return apparentlyFirstQueuedIsExclusive();
  42. }
  43. // AQS 继承过来的方法, 方便阅读, 放在此处
  44. // 与 tryAcquireShared 功能类似, 但会不断尝试 for (;;) 获取读锁, 执行过程中无阻塞
  45. final int fullTryAcquireShared(Thread current) {
  46. HoldCounter rh = null;
  47. for (; ; ) {
  48. int c = getState();
  49. if (exclusiveCount(c) != 0) {
  50. if (getExclusiveOwnerThread() != current)
  51. return -1;
  52. } else if (readerShouldBlock()) {
  53. // ... 省略不重要的代码
  54. }
  55. if (sharedCount(c) == MAX_COUNT)
  56. throw new Error("Maximum lock count exceeded");
  57. if (compareAndSetState(c, c + SHARED_UNIT)) {
  58. // ... 省略不重要的代码
  59. return 1;
  60. }
  61. }
  62. }
  63. // AQS 继承过来的方法, 方便阅读, 放在此处
  64. private void doAcquireShared(int arg) {
  65. // 将当前线程关联到一个 Node 对象上, 模式为共享模式
  66. final Node node = addWaiter(Node.SHARED);
  67. boolean failed = true;
  68. try {
  69. boolean interrupted = false;
  70. for (; ; ) {
  71. final Node p = node.predecessor();
  72. if (p == head) {
  73. // 再一次尝试获取读锁
  74. int r = tryAcquireShared(arg);
  75. // 成功
  76. if (r >= 0) {
  77. // ㈠
  78. // r 表示可用资源数, 在这里总是 1 允许传播
  79. //(唤醒 AQS 中下一个 Share 节点)
  80. setHeadAndPropagate(node, r);
  81. p.next = null; // help GC
  82. if (interrupted)
  83. selfInterrupt();
  84. failed = false;
  85. return;
  86. }
  87. }
  88. if (
  89. // 是否在获取读锁失败时阻塞(前一个阶段 waitStatus == Node.SIGNAL)
  90. shouldParkAfterFailedAcquire(p, node) &&
  91. // park 当前线程
  92. parkAndCheckInterrupt()
  93. ) {
  94. interrupted = true;
  95. }
  96. }
  97. } finally {
  98. if (failed)
  99. cancelAcquire(node);
  100. }
  101. }
  102. // ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
  103. private void setHeadAndPropagate(Node node, int propagate) {
  104. Node h = head; // Record old head for check below
  105. // 设置自己为 head
  106. setHead(node);
  107. // propagate 表示有共享资源(例如共享读锁或信号量)
  108. // 原 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE
  109. // 现在 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE
  110. if (propagate > 0 || h == null || h.waitStatus < 0 ||
  111. (h = head) == null || h.waitStatus < 0) {
  112. Node s = node.next;
  113. // 如果是最后一个节点或者是等待共享读锁的节点
  114. if (s == null || s.isShared()) {
  115. // 进入 ㈡
  116. doReleaseShared();
  117. }
  118. }
  119. }
  120. // ㈡ AQS 继承过来的方法, 方便阅读, 放在此处
  121. private void doReleaseShared() {
  122. // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
  123. // 如果 head.waitStatus == 0 ==> Node.PROPAGATE, 为了解决 bug, 见后面分析
  124. for (; ; ) {
  125. Node h = head;
  126. // 队列还有节点
  127. if (h != null && h != tail) {
  128. int ws = h.waitStatus;
  129. if (ws == Node.SIGNAL) {
  130. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  131. continue; // loop to recheck cases
  132. // 下一个节点 unpark 如果成功获取读锁
  133. // 并且下下个节点还是 shared, 继续 doReleaseShared
  134. unparkSuccessor(h);
  135. } else if (ws == 0 &&
  136. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  137. continue; // loop on failed CAS
  138. }
  139. if (h == head) // loop if head changed
  140. break;
  141. }
  142. }
  143. }

4.读锁释放锁的流程

  1. static final class NonfairSync extends Sync {
  2. // ReadLock 方法, 方便阅读, 放在此处
  3. public void unlock() {
  4. sync.releaseShared(1);
  5. }
  6. // AQS 继承过来的方法, 方便阅读, 放在此处
  7. public final boolean releaseShared(int arg) {
  8. if (tryReleaseShared(arg)) {
  9. doReleaseShared();
  10. return true;
  11. }
  12. return false;
  13. }
  14. // Sync 继承过来的方法, 方便阅读, 放在此处
  15. protected final boolean tryReleaseShared(int unused) {
  16. // ... 省略不重要的代码
  17. for (; ; ) {
  18. int c = getState();
  19. int nextc = c - SHARED_UNIT;
  20. if (compareAndSetState(c, nextc)) {
  21. // 读锁的计数不会影响其它获取读锁线程, 但会影响其它获取写锁线程
  22. // 计数为 0 才是真正释放
  23. return nextc == 0;
  24. }
  25. }
  26. }
  27. // AQS 继承过来的方法, 方便阅读, 放在此处
  28. private void doReleaseShared() {
  29. // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
  30. // 如果 head.waitStatus == 0 ==> Node.PROPAGATE
  31. for (; ; ) {
  32. Node h = head;
  33. if (h != null && h != tail) {
  34. int ws = h.waitStatus;
  35. // 如果有其它线程也在释放读锁,那么需要将 waitStatus 先改为 0
  36. // 防止 unparkSuccessor 被多次执行
  37. if (ws == Node.SIGNAL) {
  38. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  39. continue; // loop to recheck cases
  40. unparkSuccessor(h);
  41. }
  42. // 如果已经是 0 了,改为 -3,用来解决传播性,见后文信号量 bug 分析
  43. else if (ws == 0 &&
  44. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  45. continue; // loop on failed CAS
  46. }
  47. if (h == head) // loop if head changed
  48. break;
  49. }
  50. }
  51. }