Java 读写锁

1、StampedLock特性

StampedLock是JDK 8新增的读写锁,跟读写锁不同,它并不是由AQS实现的。它的state为一个long型变量,状态的设计也不同于读写锁,且提供了三种模式来控制 read/write 的获取,并且内部实现了自己的同步等待队列。

1.1、StampedLock读写锁

写锁:使用writeLock方法获取,当锁不可用时会阻塞,获取成功后返回一个与这个写锁对应的stamp,在unlockWrite方法中,需要通过这个stamp来释放与之对应的锁。在tryWriteLock同样也会提供这个stamp。当在write模式中获取到写锁时,读锁不能被获取,并且所有的乐观读锁验证(validate方法)都会失败。
读锁:使用readLock方法获取,当超出可用资源时(类似AQS的state设计)会阻塞。同样的,在获取锁成功后也会返回stamp,作用与上述相同。tryReadLock同样如此。
乐观读锁:使用tryOptimisticRead方法获取,只有在写锁可用时才能成功获取乐观读锁,获取成功后也会返回一个stamp。validate方法可以根据这个stamp来判断写锁是否被获取。这种模式可以理解为一个弱化的读锁(weak version of a read-lock),它在任何时候都能被破坏。乐观读模式常被用在短的只读的代码段,用来减少争用并提高吞吐量。乐观读区域应该只读取字段,并将它们保存在本地变量中,以便在验证(validate方法)后使用。在乐观读模式中字段的读取可能会不一致,所以可能需要反复调用validate()来检查一致性。例如,当首次读取一个对象或数组引用,然后访问其中一个的字段、元素或方法时,这些步骤通常是必需的。

1.2、三种模式的转换

StampedLock可以将三种模式是锁进行有条件的互相转换。
将其他锁转换为写锁**tryConvertToWriteLock()**
当前邮戳为持有写锁模式,直接返回当前的邮戳;
当前邮戳为持有读锁模式,则会释放读锁并获取写锁,并返回写锁邮戳;
当前邮戳持有乐观锁,通过CAS立即获取写锁,成功则返回写锁邮戳;失败则返回0;
将其他锁转换为读锁**tryConvertToReadLock**
当前邮戳为持有写锁模式,则会释放写锁并获取读锁,并返回读锁邮戳;
当前邮戳为持有读锁模式,则直接返回当前读锁邮戳;
当前邮戳持有乐观锁,通过CAS立即获取读锁,则返回读锁邮戳;否则,获取失败返回0;
将其他锁转换为乐观锁**tryConvertToOptimisticRead**
当前邮戳为持有读或写锁,则直接释放读写锁,并返回释放后的观察者邮戳值;
当前邮戳持有乐观锁,若乐观锁邮戳有效,则返回观察者邮戳;

1.3、StampedLock的应用场景

StampedLock一般作为线程安全的内部工具类。它的使用依赖于对数据、对象和方法的内部属性有一定的了解。StampedLock 是不可重入的,所以在锁的内部不能调用其他尝试重复获取锁的方。一个stamp如果在很长时间都没有使用或验证,在很长一段时间之后可能就会验证失败。StampedLocks是可序列化的,但是反序列化后变为初始的非锁定状态,所以在远程锁定中是不安全的。

1.4、StampedLock的公平性

StampedLock 的调度策略不会始终偏向读线程或写线程,所有的”try”方法都是尽最大努力获取,并不一定遵循任何调度或公平策略。从”try”方法获取或转换锁失败返回0时,不会携带任何锁的状态信息。由于StampedLock支持跨多个锁模式的协调使用,它不会直接实现Lock或ReadWriteLock接口。但是,如果应用程序需要Lock的相关功能,它可以通过asReadLock()asWriteLock()asReadWriteLock()方法返回一个Lock视图。

2、源码分析

2.1、主要属性

  1. //获取CPU的可用线程数量,用于确定自旋的时候循环次数
  2. private static final int NCPU = Runtime.getRuntime().availableProcessors();
  3. //根据NCPU确定自旋的次数限制(并不是一定这么多次,因为实际代码中是随机的)
  4. private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;
  5. //头节点上的自旋次数
  6. private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0;
  7. //头节点上的最大自旋次数
  8. private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0;
  9. //等待自旋锁溢出的周期数
  10. private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1
  11. //在溢出之前读线程计数用到的bit位数
  12. private static final int LG_READERS = 7;
  13. //一个读状态单位:0000 0000 0001
  14. private static final long RUNIT = 1L;
  15. //一个写状态单位:0000 1000 0000
  16. private static final long WBIT = 1L << LG_READERS;
  17. //读状态标识:0000 0111 1111
  18. private static final long RBITS = WBIT - 1L;
  19. //读锁计数的最大值:0000 0111 1110
  20. private static final long RFULL = RBITS - 1L;
  21. //读线程个数和写线程个数的掩码:0000 1111 1111
  22. private static final long ABITS = RBITS | WBIT;
  23. //// 读线程个数的反数,高25位全部为1:1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1000 0000
  24. private static final long SBITS = ~RBITS; // note overlap with ABITS
  25. // state的初始值
  26. private static final long ORIGIN = WBIT << 1;
  27. // 中断标识
  28. private static final long INTERRUPTED = 1L;
  29. // 节点状态值,等待/取消
  30. private static final int WAITING = -1;
  31. private static final int CANCELLED = 1;
  32. // 节点模式,读模式/写模式
  33. private static final int RMODE = 0;
  34. private static final int WMODE = 1;
  35. //等待队列的头节点
  36. private transient volatile WNode whead;
  37. //等待队列的尾节点
  38. private transient volatile WNode wtail;
  39. // 锁状态
  40. private transient volatile long state;
  41. ////因为读状态只有7位很小,所以当超过了128之后将使用此变量来记录
  42. private transient int readerOverflow;

2.2、node节点实现

  1. //等待队列的节点实现
  2. static final class WNode {
  3. //前驱节点
  4. volatile WNode prev;
  5. //后继节点
  6. volatile WNode next;
  7. //读线程使用的链表
  8. volatile WNode cowait; // list of linked readers
  9. //等待的线程
  10. volatile Thread thread; // non-null while possibly parked
  11. //节点状态
  12. volatile int status; // 0, WAITING, or CANCELLED
  13. //节点模式
  14. final int mode; // RMODE or WMODE
  15. WNode(int m, WNode p) { mode = m; prev = p; }
  16. }

2.3、state状态实现

state状态说明:

  • bit0—bit6为作为读锁计数,当超出RFULL(126)时,用readerOverflow作为读锁计数。当获取到读锁时,state加RUINT(值为1);当释放读锁时,state减去RUINT。
  • bit7为写锁标识,其值为1表示已获取写锁,值为0表示已获取读锁。当线程获取写锁或释放写锁时,都会将state加WBIT;
  • bit8—bit64:表示写锁版本,不论获取写锁还是释放写锁,其值都会改变。
    初始状态bit8为1,其他位为0。

state常用状态标识:

  • RUNIT:
    0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0001
  • WBIT:
    0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 1000 0000
  • RBITS:
    0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0111 1111
  • RFULL:
    0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0111 1110
  • ABITS:
    0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 1111 1111
  • SBITS:
    1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1000 0000
  • ORIGIN:
    0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0001 0000 0000

state常用状态判断:

  • 有无线程获取写状态:state < WBIT,true:无,false:有;
  • 读状态是否溢出:(state & ABITS) < RFULL,true;否;false:是;
  • 获取读状态: state + RUNIT(或者readerOverflow + 1)
  • 获取写状态: state + WBIT
  • 释放读状态: state - RUNIT(或者readerOverflow - 1)
  • 释放写状态: (s += WBIT) == 0L ? ORIGIN : s
  • 是否为写锁: (state & WBIT) != 0L
  • 是否为读锁: (state & RBITS) != 0L

    2.4、写锁获取及释放

    获取写锁: ```java public long writeLock() { long s, next;
    //(state & ABITS)获取低8位,判断有无读/写锁存在, //有其他读锁则bit0-bit6不为0,有其他写锁则bit7不为0, //因此如果有别的写锁或者读锁存在将失败 //尝试CAS获取写锁 //失败调用acquireWrite继续获取写锁 return ((((s = state) & ABITS) == 0L &&
    1. U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
    2. next : acquireWrite(false, 0L));
    }

private long acquireWrite(boolean interruptible, long deadline) { //node为当前节点,p为当前节点的前驱节点 WNode node = null, p;

  1. // 第一次自旋——主要进行入队工作
  2. for (int spins = -1;;) { // spin while enqueuing
  3. long m, s, ns;
  4. //(state&ABITS)为0表示无读/写锁,则尝试CAS获取写锁
  5. if ((m = (s = state) & ABITS) == 0L) {
  6. if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
  7. return ns;
  8. }
  9. else if (spins < 0)
  10. // 如果自旋次数小于0,则计算自旋的次数
  11. // 如果当前有写锁(m == WBIT)独占,且队列无元素(wtail == whead),
  12. // 说明当前节点的前驱节点为获取独占锁的节点,锁很快就会释放,
  13. // 就自旋SPINS次就行了,如果自旋完了还没轮到自己才入队
  14. // 否则自旋次数为0
  15. spins = (m == WBIT && wtail == whead) ? SPINS : 0;
  16. else if (spins > 0) {
  17. // 当自旋次数大于0时,当前这次自旋随机减一次自旋次数
  18. if (LockSupport.nextSecondarySeed() >= 0)
  19. --spins;
  20. }
  21. else if ((p = wtail) == null) { // initialize queue
  22. // 如果队列未初始化,新建一个空节点并初始化头节点和尾节点
  23. WNode hd = new WNode(WMODE, null);
  24. if (U.compareAndSwapObject(this, WHEAD, null, hd))
  25. wtail = hd;
  26. }
  27. else if (node == null)
  28. // 如果新增节点还未初始化,则新建之,并赋值其前置节点为尾节点
  29. node = new WNode(WMODE, p);
  30. else if (node.prev != p)
  31. // 如果新增节点的前驱节点不是尾节点,
  32. // 则更新新增节点的前驱节点为新的尾节点
  33. node.prev = p;
  34. else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
  35. // 尝试更新新增节点为新的尾节点成功,则退出循环
  36. p.next = node;
  37. break;
  38. }
  39. }
  40. // 第二次自旋——主要进行阻塞并等待唤醒
  41. for (int spins = -1;;) {
  42. // h为头节点,np为新增节点的前置节点,pp为前前置节点,ps为前置节点的状态
  43. WNode h, np, pp; int ps;
  44. // 如果头节点等于前置节点,说明快轮到自己了
  45. if ((h = whead) == p) {
  46. if (spins < 0)
  47. // 自旋次数小于0,则初始化自旋次数
  48. spins = HEAD_SPINS;
  49. else if (spins < MAX_HEAD_SPINS)
  50. // 自旋次数小于头结点最大自旋次数,则增加自旋次数
  51. spins <<= 1;
  52. // 第三次自旋,不断尝试获取写锁
  53. for (int k = spins;;) { // spin at head
  54. long s, ns;
  55. //无读写锁则CAS获取写锁,成功则更新节点信息
  56. if (((s = state) & ABITS) == 0L) {
  57. if (U.compareAndSwapLong(this, STATE, s,
  58. ns = s + WBIT)) {
  59. whead = node;
  60. node.prev = null;
  61. return ns;
  62. }
  63. }
  64. // 随机立减自旋次数,当自旋次数减为0时跳出循环再重试
  65. else if (LockSupport.nextSecondarySeed() >= 0 &&
  66. --k <= 0)
  67. break;
  68. }
  69. }
  70. //头节点为空?
  71. else if (h != null) { // help release stale waiters
  72. WNode c; Thread w;
  73. // 如果头节点的cowait链表(栈)不为空,唤醒里面的所有节点
  74. while ((c = h.cowait) != null) {
  75. if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
  76. (w = c.thread) != null)
  77. U.unpark(w);
  78. }
  79. }
  80. // 如果头节点没有变化
  81. if (whead == h) {
  82. // 如果尾节点有变化,则更新
  83. if ((np = node.prev) != p) {
  84. if (np != null)
  85. (p = np).next = node; // stale
  86. }
  87. else if ((ps = p.status) == 0)
  88. // 如果尾节点状态为0,则更新成WAITING
  89. U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
  90. else if (ps == CANCELLED) {
  91. // 如果尾节点状态为取消,则把它从链表中删除
  92. if ((pp = p.prev) != null) {
  93. node.prev = pp;
  94. pp.next = node;
  95. }
  96. }
  97. else {
  98. // 有超时时间的处理
  99. long time; // 0 argument to park means no timeout
  100. if (deadline == 0L)
  101. time = 0L;
  102. //时间以过期则取消节点
  103. else if ((time = deadline - System.nanoTime()) <= 0L)
  104. return cancelWaiter(node, node, false);
  105. //设置线程blocker
  106. Thread wt = Thread.currentThread();
  107. U.putObject(wt, PARKBLOCKER, this);
  108. node.thread = wt;
  109. //1、当前节点前驱节点状态为WAITING;
  110. //2、当前节点的前驱节点不是头节点或有读写锁已经被获取;
  111. //3、头结点为改变;
  112. //4、当前节点的前驱节点未改变;
  113. //当以上4个条件都满足时将当前节点进行阻塞
  114. if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
  115. whead == h && node.prev == p)
  116. U.park(false, time); // emulate LockSupport.park
  117. node.thread = null;
  118. //设置线程blocker为空
  119. U.putObject(wt, PARKBLOCKER, null);
  120. //当前节点百中断则取消节点
  121. if (interruptible && Thread.interrupted())
  122. return cancelWaiter(node, node, true);
  123. }
  124. }
  125. }

}

  1. **写锁释放:**
  2. ```java
  3. public void unlockWrite(long stamp) {
  4. WNode h;
  5. //因为写锁是独占锁,可以简单判断state != stamp;
  6. //或者bit7为0,即stamp状态为无写锁
  7. if (state != stamp || (stamp & WBIT) == 0L)
  8. throw new IllegalMonitorStateException();
  9. //修改state状态,state += WBIT;溢出则初始化为ORIGIN
  10. state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
  11. //头节点不为空,且状态正常则释放头结点的锁
  12. if ((h = whead) != null && h.status != 0)
  13. release(h);
  14. }
  15. private void release(WNode h) {
  16. if (h != null) {
  17. WNode q; Thread w;
  18. //设置头节点状态为0
  19. U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
  20. //头节点下个节点为空或状态为CANCEL,则从尾节点前向遍历,
  21. //找到头结点后面的第一个有效节点(状态为0或WAITTING)
  22. if ((q = h.next) == null || q.status == CANCELLED) {
  23. for (WNode t = wtail; t != null && t != h; t = t.prev)
  24. if (t.status <= 0)
  25. q = t;
  26. }
  27. //唤醒下一个有效节点
  28. if (q != null && (w = q.thread) != null)
  29. U.unpark(w);
  30. }
  31. }

2.5、读锁获取及释放

读锁获取:

  1. public long readLock() {
  2. long s = state, next; // bypass acquireRead on common uncontended case
  3. //同步队列为空,读锁计数值未超过最大值(无写锁),CAS获取锁成功,则直接返回邮戳值;
  4. //否则acquireRead获取读锁
  5. return ((whead == wtail && (s & ABITS) < RFULL &&
  6. U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
  7. next : acquireRead(false, 0L));
  8. }
  9. private long acquireRead(boolean interruptible, long deadline) {
  10. WNode node = null, p;
  11. for (int spins = -1;;) {
  12. WNode h;
  13. //如果同步队列头节点等于尾节点,说明队列中没有节点或者只有一个节点
  14. //则自旋一段时间,等待头结点释放锁后获取锁
  15. if ((h = whead) == (p = wtail)) {
  16. for (long m, s, ns;;) {
  17. //(state & ABITS)小于RFULL,表示无写锁,则直接CAS获取读锁;
  18. //否则若(state & ABITS)小于WBIT表示无写锁但读锁计数已溢出,
  19. //则CAS更新读锁计数获取读锁
  20. if ((m = (s = state) & ABITS) < RFULL ?
  21. U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
  22. (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
  23. return ns;
  24. //(state & ABITS)大于WBIT,表示写锁已经被占,则自旋
  25. else if (m >= WBIT) {
  26. //随机减自旋次数
  27. if (spins > 0) {
  28. if (LockSupport.nextSecondarySeed() >= 0)
  29. --spins;
  30. }
  31. else {
  32. //自旋完还没获取到读锁?
  33. if (spins == 0) {
  34. WNode nh = whead, np = wtail;
  35. //判断稳定性(有没有被修改),跳出循环
  36. if ((nh == h && np == p) || (h = nh) != (p = np))
  37. break;
  38. }
  39. //初始化spins
  40. spins = SPINS;
  41. }
  42. }
  43. }
  44. }
  45. //自旋获取失败,则进行节点初始化相关的处理
  46. //尾节点为空,则初始化队列
  47. if (p == null) { // initialize queue
  48. WNode hd = new WNode(WMODE, null);
  49. if (U.compareAndSwapObject(this, WHEAD, null, hd))
  50. wtail = hd;
  51. }
  52. //初始化代表当前读线程的节点
  53. else if (node == null)
  54. node = new WNode(RMODE, p);
  55. //head==tail或者队列tail.mode不为读状态,
  56. //那么将当前线程的节点node加入到队列尾部并跳出外层循环
  57. else if (h == p || p.mode != RMODE) {
  58. if (node.prev != p)
  59. node.prev = p;
  60. else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
  61. p.next = node;
  62. break;
  63. }
  64. }
  65. //如果head!= tail说明队列中已经有线程在等待或者tail.mode是读状态RMODE,
  66. //那么CAS方式将当前线程的节点node加入到tail节点的cowait链中
  67. else if (!U.compareAndSwapObject(p, WCOWAIT,
  68. node.cowait = p.cowait, node))
  69. node.cowait = null;
  70. else {
  71. for (;;) {
  72. WNode pp, c; Thread w;
  73. ////如果head不为空那么尝试去解放head的cowait链中的节点
  74. if ((h = whead) != null && (c = h.cowait) != null &&
  75. U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
  76. (w = c.thread) != null) // help release
  77. U.unpark(w);
  78. //如果tail节点的前驱就是head或者head==tail或者tail节点的前驱是null
  79. //也就是说当前node所在的节点(因为node可能在cowait链中)
  80. //的前驱就是head或者head已经被释放了为null
  81. if (h == (pp = p.prev) || h == p || pp == null) {
  82. long m, s, ns;
  83. do {
  84. //如果没有写状态被占有那么自旋方式尝试获取读状态,成功则返回stamp
  85. if ((m = (s = state) & ABITS) < RFULL ?
  86. U.compareAndSwapLong(this, STATE, s,
  87. ns = s + RUNIT) :
  88. (m < WBIT &&
  89. (ns = tryIncReaderOverflow(s)) != 0L))
  90. return ns;
  91. } while (m < WBIT);
  92. }
  93. //判断是否稳定
  94. if (whead == h && p.prev == pp) {
  95. long time;
  96. //如果tail的前驱是null或者head==tail或者tail已经被取消了(p.status > 0)
  97. //直接将node置为null跳出循环,回到最开的for循环中去再次尝试获取同步状态
  98. if (pp == null || h == p || p.status > 0) {
  99. node = null; // throw away
  100. break;
  101. }
  102. if (deadline == 0L)
  103. time = 0L;
  104. //如果超时则取消当前线程
  105. else if ((time = deadline - System.nanoTime()) <= 0L)
  106. return cancelWaiter(node, p, false);
  107. Thread wt = Thread.currentThread();
  108. U.putObject(wt, PARKBLOCKER, this);
  109. node.thread = wt;
  110. //tail的前驱不是head或者当前只有写线程获取到同步状态
  111. //判断稳定性
  112. if ((h != pp || (state & ABITS) == WBIT) &&
  113. whead == h && p.prev == pp)
  114. U.park(false, time);
  115. node.thread = null;
  116. U.putObject(wt, PARKBLOCKER, null);
  117. //中断的话取消
  118. if (interruptible && Thread.interrupted())
  119. return cancelWaiter(node, p, true);
  120. }
  121. }
  122. }
  123. }
  124. //如果队列中没有节点或者tail的mode是WMODE写状态,
  125. //那么node被加入到队列的tail之后进入这个循环
  126. for (int spins = -1;;) {
  127. WNode h, np, pp; int ps;
  128. //如果p(node的前驱节点)就是head,那么自旋方式尝试获取同步状态
  129. if ((h = whead) == p) {
  130. //第一次循环,设置自旋次数
  131. if (spins < 0)
  132. spins = HEAD_SPINS;
  133. //自旋次数增加
  134. else if (spins < MAX_HEAD_SPINS)
  135. spins <<= 1;
  136. for (int k = spins;;) { // spin at head
  137. long m, s, ns;
  138. //自旋方式尝试获取同步状态
  139. //获取成功的话将node设置为head并解放node的cowait链中的节点并返回stamp
  140. if ((m = (s = state) & ABITS) < RFULL ?
  141. U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
  142. (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
  143. WNode c; Thread w;
  144. whead = node;
  145. node.prev = null;
  146. while ((c = node.cowait) != null) {
  147. if (U.compareAndSwapObject(node, WCOWAIT,
  148. c, c.cowait) &&
  149. (w = c.thread) != null)
  150. U.unpark(w);
  151. }
  152. return ns;
  153. }
  154. //如果有写线程获取到了同步状态(因为可能有写线程闯入)那么随机的--k控制循环次数
  155. else if (m >= WBIT &&
  156. LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
  157. break;
  158. }
  159. }
  160. //如果head不为null,解放head的cowait链中的节点
  161. else if (h != null) {
  162. WNode c; Thread w;
  163. while ((c = h.cowait) != null) {
  164. if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
  165. (w = c.thread) != null)
  166. U.unpark(w);
  167. }
  168. }
  169. //判断稳定性
  170. if (whead == h) {
  171. if ((np = node.prev) != p) {
  172. if (np != null)
  173. (p = np).next = node; // stale
  174. }
  175. //尝试设tail的状态位WAITING表示后面还有等待的节点
  176. else if ((ps = p.status) == 0)
  177. U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
  178. //如果tail已经取消了
  179. else if (ps == CANCELLED) {
  180. if ((pp = p.prev) != null) {
  181. node.prev = pp;
  182. pp.next = node;
  183. }
  184. }
  185. else {
  186. //超时判定
  187. long time;
  188. if (deadline == 0L)
  189. time = 0L;
  190. else if ((time = deadline - System.nanoTime()) <= 0L)
  191. return cancelWaiter(node, node, false);
  192. Thread wt = Thread.currentThread();
  193. U.putObject(wt, PARKBLOCKER, this);
  194. node.thread = wt;
  195. //阻塞等待
  196. if (p.status < 0 &&
  197. (p != h || (state & ABITS) == WBIT) &&
  198. whead == h && node.prev == p)
  199. U.park(false, time);
  200. node.thread = null;
  201. U.putObject(wt, PARKBLOCKER, null);
  202. //中断处理
  203. if (interruptible && Thread.interrupted())
  204. return cancelWaiter(node, node, true);
  205. }
  206. }
  207. }
  208. }

读锁释放:

  1. public void unlockRead(long stamp) {
  2. long s, m; WNode h;
  3. for (;;) {
  4. //写计数相关的bit位有改变,或读计数相关的bit位为0,或有写锁,则异常
  5. if (((s = state) & SBITS) != (stamp & SBITS) ||
  6. (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
  7. throw new IllegalMonitorStateException();
  8. //读锁计数未溢出?
  9. if (m < RFULL) {
  10. if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
  11. if (m == RUNIT && (h = whead) != null && h.status != 0)
  12. release(h);
  13. break;
  14. }
  15. }
  16. //读锁计数溢出?
  17. else if (tryDecReaderOverflow(s) != 0L)
  18. break;
  19. }
  20. }

2.6、乐观锁获取

  1. public long tryOptimisticRead() {
  2. long s;
  3. //当无写锁,则返回(state & SBITS),即高位的写计数;
  4. //否则返回0,即获取乐观锁失败
  5. return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
  6. }

2.7、锁模式的转换

转换为写锁:将其他模式(如写锁、读锁、乐观锁)的锁转换为写锁

  1. public long tryConvertToWriteLock(long stamp) {
  2. long a = stamp & ABITS, m, s, next;
  3. //写锁的状态未变?
  4. while (((s = state) & SBITS) == (stamp & SBITS)) {
  5. //当前无写/读锁?CAS获取写锁
  6. if ((m = s & ABITS) == 0L) {
  7. if (a != 0L)
  8. break;
  9. if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
  10. return next;
  11. }
  12. //当前有写锁,且状态不变,则直接返回原本的版本邮戳
  13. else if (m == WBIT) {
  14. //写线程不为当前线程?错误,跳出循环
  15. if (a != m)
  16. break;
  17. return stamp;
  18. }
  19. //无写锁,有一个读锁,即当前线程为获取读锁的线程
  20. //直接cas将读锁释放并获取写锁,即state=(state - RUNIT + WBIT)
  21. else if (m == RUNIT && a != 0L) {
  22. if (U.compareAndSwapLong(this, STATE, s,
  23. next = s - RUNIT + WBIT))
  24. return next;
  25. }
  26. //当还有其他线程获取读锁时,自旋等待其他读锁释放
  27. else
  28. break;
  29. }
  30. return 0L;
  31. }

主要处理:
当锁状态为无读/写锁时,通过CAS获取写锁;
当当前线程已获取写锁时,直接返回原本的邮戳;
当只有一个读锁,即当只有当前线程获取读锁,没有其他的读锁时,CAS释放读锁并获取写锁;
当获取读锁的线程数大于1,即除当前线程还有其他线程也获取到读锁时,自旋等待其他读锁释放;
转换为读锁:将读/写锁转换为读锁;

  1. public long tryConvertToReadLock(long stamp) {
  2. long a = stamp & ABITS, m, s, next; WNode h;
  3. //无写锁?
  4. while (((s = state) & SBITS) == (stamp & SBITS)) {
  5. //无读/写锁,则CAS获取读锁
  6. if ((m = s & ABITS) == 0L) {
  7. //邮戳错误?退出循环
  8. if (a != 0L)
  9. break;
  10. //读锁计数未溢出?
  11. else if (m < RFULL) {
  12. if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
  13. return next;
  14. }
  15. //读锁计数溢出?
  16. else if ((next = tryIncReaderOverflow(s)) != 0L)
  17. return next;
  18. }
  19. //有写锁,且写锁为当前线程?则释放读锁并唤醒后续写锁线程
  20. else if (m == WBIT) {
  21. //写锁不为当前线程,错误,跳出循环
  22. if (a != m)
  23. break;
  24. state = next = s + (WBIT + RUNIT);
  25. if ((h = whead) != null && h.status != 0)
  26. release(h);
  27. return next;
  28. }
  29. //当前线程已经获取得读锁?
  30. else if (a != 0L && a < WBIT)
  31. return stamp;
  32. else
  33. break;
  34. }
  35. return 0L;
  36. }

主要处理:
若无读/写锁,则CAS获取读锁;
若写锁为当前线程,则CAS释放写锁并获取读锁;
若当前线程已经获取读锁,则返回原本的邮戳;
转换为乐观锁:

  1. public long tryConvertToOptimisticRead(long stamp) {
  2. long a = stamp & ABITS, m, s, next; WNode h;
  3. U.loadFence();
  4. for (;;) {
  5. //写锁状态变更?
  6. if (((s = state) & SBITS) != (stamp & SBITS))
  7. break;
  8. //无读/写锁?直接返回state
  9. if ((m = s & ABITS) == 0L) {
  10. if (a != 0L)
  11. break;
  12. return s;
  13. }
  14. //已经获取写锁?则直接释放写锁
  15. else if (m == WBIT) {
  16. if (a != m)
  17. break;
  18. state = next = (s += WBIT) == 0L ? ORIGIN : s;
  19. if ((h = whead) != null && h.status != 0)
  20. release(h);
  21. return next;
  22. }
  23. //状态为无读锁或无写锁?
  24. else if (a == 0L || a >= WBIT)
  25. break;
  26. //读锁计数未溢出
  27. else if (m < RFULL) {
  28. if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT)) {
  29. if (m == RUNIT && (h = whead) != null && h.status != 0)
  30. release(h);
  31. return next & SBITS;
  32. }
  33. }
  34. //读锁计数溢出?
  35. else if ((next = tryDecReaderOverflow(s)) != 0L)
  36. return next & SBITS;
  37. }
  38. return 0L;
  39. }

主要处理:
若当前线程已经获取写锁,则直接释放写锁;
若当前线程已经获取读锁,则CAS释放读锁;
当锁时,返回状态值state;