自旋:

  • 缺点:耗费cpu资源,没有竞争到锁的线程会一直占用cpu资源进行cas操作,假如一个线程获得锁后要花费n秒处理业务逻辑,娜另外一个线程就会白白花费n秒的cpu资源。
  • 思路:让得不到锁的线程让出cpu

    • yield方法:仅仅只是让出来,但是还是会争抢,即cpu下次可能还是调用它
    • sleep方法:对时间不好把控

      引入:lockSupport

      用来创建锁和其他同步类的基本线程阻塞原语;其中park和unpark的作用分别是阻塞线程和解除线程阻塞

      线程等待和唤醒的方法

  • Object类中的wait和notify方法

    • wait和notify方法必须在同步代码块中,即必须先拿到对象锁才能wait和notify。底层是线程wait之后进入了waitSet队列,然后等待别人唤醒去和entryList中阻塞的队列抢锁(非公平)
    • notify方法在wait方法之前执行不能唤醒之后的wait线程
  • juc包中的condition接口的await方法和notify方法
    • 效果同上
  • lockSupport可以阻塞当前线程和唤醒指定被阻塞的线程:park与unpark
    • 不需要在同步块中执行
    • unpark可以在park之前执行,即给一个许可证。线程拥有许可证就可以继续执行。先unpark后park则说明线程在park时消耗了一个许可证,所以不会等待。一次unpark相当于一个许可证,unpark的许可证最多给一个,多次给也只会是一个,相当于没给,再次park就会进入wait状态。
    • 底层是调用了Unsafe类中的native方法

      1. Node内部类

      ```java // node内部类 static final class Node {

}

  1. <a name="Tyl2K"></a>
  2. ## 1.1 常量字段
  3. ```java
  4. static final Node SHARED = new Node(); // 标识共享锁节点
  5. static final Node EXCLUSIVE = null; // 标识排它锁节点
  6. // 以下常量,都是waitStatus字段可能的值
  7. static final Node SHARED = new Node(); // 标识共享锁节点
  8. static final Node EXCLUSIVE = null; // 标识排它锁节点
  9. // 以下常量,都是waitStatus字段可能的值
  10. static final int CANCELLED = 1; // 取消状态,也就是不想申请或释放了
  11. static final int SIGNAL = -1; // 等待通知状态,当节点变为这个状态,唤醒后继节点
  12. static final int CONDITION = -2; // 条件状态,await/signal 场景下使用
  13. static final int PROPAGATE = -3; // 传播状态,传播共享状态

1.2 变量字段

  1. volatile int waitStatus; // 用于标识每一个等待节点状态
  2. volatile Node prev; // 前置节点指针
  3. volatile Node next; // 后继节点指针
  4. volatile Thread thread; // 持有的线程对象,可以理解为每个node代表一个线程
  5. Node nextWaiter; // 等待队列下一个节点指针(Condition中使用)

1.3 方法

  1. // 判断是否为申请共享锁的节点;
  2. final boolean isShared() {
  3. return nextWaiter == SHARED;
  4. }
  5. // 获取前置节点,如果为null抛出异常
  6. final Node predecessor() throws NullPointerException {
  7. Node p = prev;
  8. if (p == null)
  9. throw new NullPointerException();
  10. else
  11. return p;
  12. }
  13. // 构造方法
  14. Node(Thread thread, Node mode) { // Used by addWaiter
  15. this.nextWaiter = mode;
  16. this.thread = thread;
  17. }
  18. Node(Thread thread, int waitStatus) { // Used by Condition
  19. this.waitStatus = waitStatus;
  20. this.thread = thread;
  21. }

2.AQS(abstract queuee synchronizer)

2.1 AQS成员变量

  1. private transient volatile Node head;
  2. private transient volatile Node tail;
  3. private volatile int state; // 同步器状态,它的值决定了是否持有锁,通过CAS自旋完成对state值的修改
  4. // 0表示没人,自由状态;>=1表示有人占用,进入队列等着

2.2 抽象方法

AQS中包含5个未实现方法,也就是等待子类来实现个性化逻辑的,他们是:

  1. protected boolean tryAcquire(int arg); // 尝试获取排他锁
  2. protected boolean tryRelease(int arg); // 尝试释放排他锁
  3. protected int tryAcquireShared(int arg); // 尝试获取共享锁
  4. protected boolean tryReleaseShared(int arg); // 尝试释放共享锁
  5. protected boolean isHeldExclusively(); // 判断当前线程是否持有排他锁

由这5个方法可知,AQS中涉及了排它锁和共享锁两类的操作,每种锁又包含申请和释放两类操作流程,每个流程中又涉及到多个实例方法和抽象方法调用。
在AQS中,方法分为可中断的、不可中断的、同步的、带超时时间的同步的,目前整理的流程是“不可中断的同步的”。

2.3 结构

虚拟双向队列。将请求资源的线程封装成队列的节点,通过CAS、自旋以及LockSupport.park()方式,维护state变量的状态,使并发达到同步的控制效果。state+CLH
截屏2021-06-25 上午10.02.26.png

3. ReentrantLock

截屏2021-06-25 上午10.17.29.png

3.1 加锁

image.png

3.1.1 公平锁

  1. final void lock() {
  2. acquire(1);
  3. }
  4. // AQS的方法,
  5. public final void acquire(int arg) {
  6. // tryAcquire 进入1 ,addWaiter进入2, acquireQueued进入3
  7. // tryAcquire拿到了锁就不会执行下面的
  8. // 没拿到锁;进入addWaiter(加入等待队列)方法(标识排他锁)(把当前线程封装成为node)
  9. if (!tryAcquire(arg) &&
  10. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  11. // 如果是被打断的,acquireQueued方法会返回true,设置打断标记,让自己知道是被打断唤醒的
  12. selfInterrupt();
  13. }
  14. // 1.尝试获取锁
  15. protected final boolean tryAcquire(int acquires) {
  16. final Thread current = Thread.currentThread();
  17. // 拿到线程当前的状态c
  18. int c = getState();
  19. // c=0,没人占用锁,我要去上锁.(上一个人恰好处理完解锁,我恰好来上锁)
  20. if (c == 0) {
  21. // hasQueuedPredecessors 进入1.1
  22. /**
  23. * 通过对比源码发现,公平锁比非公平锁多了这块代码: !hasQueuedPredecessors()
  24. * hasQueuedPredecessors() 是做什么呢?就是判断当前同步队列中是否存在节点,如果存在节点呢,
  25. * 就返回true,由于前面有个 !,那么就是false,再根据 && 逻辑运算符的特性,不会继续执行了;
  26. *
  27. * tryAcquire()方法直接返回false,后面的逻辑就和非公平锁的一致了,就是创建Node节点,并将
  28. * 节点加入到同步队列尾; 公平锁:发现当前同步队列中存在节点,有线程在自己前面已经申请可锁,那
  29. * 自己就得乖乖的向后面排队去。
  30. *
  31. * 友情提示:在生活中,我们也需要按照先来后到去排队,保证素质; 还有就是怕你们不排队被别人打了。
  32. */
  33. // 我前面的等待队列中没人排队(我是第一个),有人排队我也要排队
  34. if (!hasQueuedPredecessors() &&
  35. // 设置值为1
  36. compareAndSetState(0, acquires)) {
  37. // 将该线程赋给当前锁对象(独占锁)
  38. setExclusiveOwnerThread(current);
  39. return true;
  40. }
  41. }
  42. // 线程必须是当前存入的,重入;如果不是当前线程,不进。
  43. else if (current == getExclusiveOwnerThread()) {
  44. // 重入,把state的值+1
  45. int nextc = c + acquires;
  46. if (nextc < 0)
  47. throw new Error("Maximum lock count exceeded");
  48. setState(nextc);
  49. return true;
  50. }
  51. return false;
  52. }
  53. // AQS方法 1.1 判断队列中有没有节点在排队,没人排队返回true,有人排队返回false
  54. public final boolean hasQueuedPredecessors() {
  55. // The correctness of this depends on head being initialized
  56. // before tail and on head.next being accurate if the current
  57. // thread is first in queue.
  58. Node t = tail; // Read fields in reverse initialization order
  59. Node h = head;
  60. Node s;
  61. return h != t &&
  62. ((s = h.next) == null || s.thread != Thread.currentThread());
  63. }
  64. // AQS 方法 2.加入等待队列(把当前线程封装成为node)
  65. // 第一次添加节点的时候会new一个双向链表等待队列,返回当前节点
  66. private Node addWaiter(Node mode) {
  67. // 把当前线程封装成为node,mode为独占锁
  68. Node node = new Node(Thread.currentThread(), mode);
  69. // Try the fast path of enq; backup to full enq on failure
  70. Node pred = tail;
  71. // 队列为空,需要初始化队列,不会进(第一次队列还未初始化);
  72. if (pred != null) {
  73. // 队列不为空,进入队列中,将自己设置为tail节点
  74. node.prev = pred;
  75. if (compareAndSetTail(pred, node)) {
  76. pred.next = node;
  77. return node;
  78. }
  79. }
  80. // 2.1 加入队列
  81. enq(node);
  82. return node;
  83. }
  84. // AQS方法 2.1 进入队列尾部
  85. private Node enq(final Node node) {
  86. for (;;) { // 自旋
  87. Node t = tail;
  88. // 如果尾结点是空的,先初始化;new一个哑结点,放在队头
  89. if (t == null) { // Must initialize
  90. // new哑结点,并将头尾指针指向这个哑结点
  91. if (compareAndSetHead(new Node()))
  92. tail = head;
  93. } else {
  94. // 再添加一个当前的节点(自己)
  95. node.prev = t;
  96. // 把自己设置为队尾节点
  97. if (compareAndSetTail(t, node)) {
  98. t.next = node;
  99. return t;
  100. }
  101. }
  102. }
  103. }
  104. // AQS方法3.
  105. // 判断我之前的节点是park,那么我也park;如果上一个节点不是park,竞争锁
  106. // 自旋过程,每个节点来的时候每次进入自旋都会判断我能不能拿到锁,如果不能拿到锁,就先把上一个节点
  107. // 的waitStatus改成-1,自己进入阻塞状态
  108. final boolean acquireQueued(final Node node, int arg) {
  109. // 异常1:这个结点自己不想等了
  110. boolean failed = true;
  111. try {
  112. // 异常2:排队的过程被打断了
  113. boolean interrupted = false;
  114. // 自旋过程,每一次自旋都会去尝试获得一次锁,获得失败才会走下面的流程自旋
  115. for (;;) {
  116. // 拿到上一个节点p
  117. final Node p = node.predecessor();
  118. // 如果上一个节点p是head(即自己是进入等待队列自己是第二个节点,排队中的第一个节点)
  119. // unpark拿到锁之后会进入这个方法
  120. if (p == head && tryAcquire(arg)) {
  121. // 获取到锁,就把自己变成head(哑结点),并清空当前存的线程,这个时候如果不是队尾,waitStatus就是-1
  122. setHead(node);
  123. // 把前面的p节点gc
  124. p.next = null; // help GC
  125. // 正常结束,没有被取消
  126. failed = false;
  127. // 如果被打断,返回true;不被打断,返回false
  128. return interrupted;
  129. }
  130. // 上一个节点不是head(自己不是第二个节点)或者锁还是被占用着,进入3.1 shouldParkAfterFailedAcquire:
  131. // 判断上一个节点p是否需要唤醒当前节点
  132. // 如果是-1,进入3.2 parkAndCheckInterrupt 则将自己park住,等待上一个节点唤醒
  133. // 如果不是-1,设置成-1,并进行下一次自旋
  134. // 整个流程:第一趟:将哨兵节点的值变成-1,第二趟:返回true,然后将自己park住(暂停执行,等待恢复执行)
  135. // 如果是被打断的,parkAndCheckInterrupt方法会返回true,interrupted就置为true表示自己是被打断的
  136. if (shouldParkAfterFailedAcquire(p, node) &&
  137. parkAndCheckInterrupt())
  138. interrupted = true;
  139. }
  140. } finally {
  141. if (failed)
  142. cancelAcquire(node);
  143. }
  144. }
  145. //AQS 方法 3.1 判断前一个节点的waitStatus值,如果是-1,返回true;如果是1,取消;如果是其他,将值改成-1,返回false进行下次自旋
  146. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  147. int ws = pred.waitStatus;
  148. // 如果是-1,需要唤醒下个节点,返回true
  149. if (ws == Node.SIGNAL)
  150. /*
  151. * This node has already set status asking a release
  152. * to signal it, so it can safely park.
  153. */
  154. return true;
  155. // 1表示线程取消判断
  156. if (ws > 0) {
  157. /*
  158. * Predecessor was cancelled. Skip over predecessors and
  159. * indicate retry.
  160. */
  161. do {
  162. node.prev = pred = pred.prev;
  163. } while (pred.waitStatus > 0);
  164. pred.next = node;
  165. } else {
  166. /*
  167. * waitStatus must be 0 or PROPAGATE. Indicate that we
  168. * need a signal, but don't park yet. Caller will need to
  169. * retry to make sure it cannot acquire before parking.
  170. */
  171. // 否则,修改这个值为-1
  172. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  173. }
  174. return false;
  175. }
  176. // AQS方法 park住自己
  177. private final boolean parkAndCheckInterrupt() {
  178. // 线程挂起,程序不会向下执行
  179. LockSupport.park(this);
  180. // 根据park的api描述,程序会在以下三种情况向下执行:
  181. // 1.被unpark
  182. // 2.被中断(interrupt)
  183. // 3.其他不合逻辑的返回
  184. // 因上述三种情况程序执行到此处,返回当前线程的中断状态,并清空中断状态
  185. // 如果由于被中断,该方法会返回true
  186. return Thread.interrupted();
  187. }

3.1.2 非公平锁的不同点

  1. final void lock() {
  2. // 先竞争,竞争失败再排队
  3. if (compareAndSetState(0, 1))
  4. // 设置占用线程为当前线程,独占锁
  5. setExclusiveOwnerThread(Thread.currentThread());
  6. else
  7. acquire(1);
  8. }
  9. final boolean nonfairTryAcquire(int acquires) {
  10. final Thread current = Thread.currentThread();
  11. int c = getState();
  12. if (c == 0) {
  13. // 不用在乎自己是不是队首。
  14. if (compareAndSetState(0, acquires)) {
  15. setExclusiveOwnerThread(current);
  16. return true;
  17. }
  18. }
  19. else if (current == getExclusiveOwnerThread()) {
  20. int nextc = c + acquires;
  21. if (nextc < 0) // overflow
  22. throw new Error("Maximum lock count exceeded");
  23. setState(nextc);
  24. return true;
  25. }
  26. return false;
  27. }

3.2 解锁

没有公平不公平之分

  1. public void unlock() {
  2. sync.release(1);
  3. }
  4. //AQS方法
  5. public final boolean release(int arg) {
  6. // 尝试释放锁 进入1.1
  7. if (tryRelease(arg)) {
  8. Node h = head;
  9. // 队列不为空
  10. if (h != null &&
  11. // 且waitStatus == Node.SIGNAL 才需要 unpark(如果是0,那么他是队尾,就不用唤醒了)
  12. h.waitStatus != 0)
  13. // 唤醒下一个线程 进入1.2
  14. unparkSuccessor(h);
  15. return true;
  16. }
  17. return false;
  18. }
  19. //AQS方法 1.1
  20. protected final boolean tryRelease(int releases) {
  21. // 更改c的状态值,getState表示当前重入锁的数量值
  22. int c = getState() - releases;
  23. if (Thread.currentThread() != getExclusiveOwnerThread())
  24. throw new IllegalMonitorStateException();
  25. boolean free = false;
  26. // 如果c为0,表示没有人占用锁了(无重入),只有 state 减为 0, 才释放成功
  27. if (c == 0) {
  28. free = true;
  29. // 将独占该锁的线程置为null
  30. setExclusiveOwnerThread(null);
  31. }
  32. setState(c);
  33. return free;
  34. }
  35. //AQS方法 1.2 唤醒线程
  36. private void unparkSuccessor(Node node) {
  37. /*
  38. * If status is negative (i.e., possibly needing signal) try
  39. * to clear in anticipation of signalling. It is OK if this
  40. * fails or if status is changed by waiting thread.
  41. */
  42. int ws = node.waitStatus;
  43. // 更改waitStatus状态为0
  44. if (ws < 0)
  45. compareAndSetWaitStatus(node, ws, 0);
  46. /*
  47. * Thread to unpark is held in successor, which is normally
  48. * just the next node. But if cancelled or apparently null,
  49. * traverse backwards from tail to find the actual
  50. * non-cancelled successor.
  51. */
  52. // 拿到头结点的下个节点s
  53. Node s = node.next;
  54. // 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
  55. if (s == null || s.waitStatus > 0) {
  56. s = null;
  57. for (Node t = tail; t != null && t != node; t = t.prev)
  58. if (t.waitStatus <= 0)
  59. s = t;
  60. }
  61. // 下个节点s不为null,唤醒下一个节点s
  62. // unpark后,从之前park的地方执行,即parkAndCheckInterrupt()方法
  63. if (s != null)
  64. LockSupport.unpark(s.thread);
  65. }

3.3 可打断原理

  • 不可打断模式:

在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了(阻塞状态被打断会清除打断标记)。

  1. // Sync 继承自 AQS
  2. static final class NonfairSync extends Sync {
  3. // ...
  4. public final void acquire(int arg) {
  5. if (
  6. !tryAcquire(arg) &&
  7. acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
  8. ) {
  9. // 如果打断状态为 true
  10. selfInterrupt();
  11. }
  12. }
  13. static void selfInterrupt() {
  14. // 重新产生一次中断,(让线程知道自己是被打断阻塞而唤醒的,因为interrupt打断阻塞线程后会清除打断标记)这时候线程是如果正常运行的状态,那么不是处于sleep等状态,interrupt方法就不会报错
  15. Thread.currentThread().interrupt();
  16. }
  17. final boolean acquireQueued(final Node node, int arg) {
  18. boolean failed = true;
  19. try {
  20. boolean interrupted = false;
  21. for (;;) {
  22. final Node p = node.predecessor();
  23. // 再次循环拿到了锁
  24. if (p == head && tryAcquire(arg)) {
  25. setHead(node);
  26. p.next = null;
  27. // 更改failed为失败状态
  28. failed = false;
  29. // 获得锁后, 返回打断状态
  30. return interrupted;
  31. }
  32. if (
  33. shouldParkAfterFailedAcquire(p, node) &&
  34. parkAndCheckInterrupt()
  35. ) {
  36. // 如果是因为 interrupt 被唤醒, 返回打断状态为 true
  37. interrupted = true;
  38. }
  39. }
  40. } finally {
  41. // 只有获得锁出现异常,才会执行该方法(自己实现的AQS类可能会出异常)
  42. if (failed)
  43. // 取消加锁
  44. cancelAcquire(node);
  45. }
  46. }
  47. private final boolean parkAndCheckInterrupt() {
  48. // 如果打断标记已经是 true, 则 park 会失效
  49. LockSupport.park(this);
  50. // unpark之后执行,判断阻塞是否被别人打断
  51. // interrupted 会清除打断标记
  52. return Thread.interrupted();
  53. }
  54. }
  • 可打断模式: ```java static final class NonfairSync extends Sync {

    public final void acquireInterruptibly(int arg) throws InterruptedException {

    1. if (Thread.interrupted())
    2. throw new InterruptedException();
    3. // 如果没有获得到锁, 进入 ㈠
    4. if (!tryAcquire(arg))
    5. doAcquireInterruptibly(arg);

    }

    // ㈠ 可打断的获取锁流程 private void doAcquireInterruptibly(int arg) throws InterruptedException {

    1. final Node node = addWaiter(Node.EXCLUSIVE);
    2. boolean failed = true;
    3. try {
    4. for (;;) {
    5. final Node p = node.predecessor();
    6. if (p == head && tryAcquire(arg)) {
    7. setHead(node);
    8. p.next = null; // help GC
    9. failed = false;
    10. return;
    11. }
    12. if (shouldParkAfterFailedAcquire(p, node) &&
    13. parkAndCheckInterrupt()) {
    14. // 在 park 过程中如果被 interrupt 会进入此
    15. // 这时候抛出异常, 而不会再次进入 for (;;)
    16. throw new InterruptedException();
    17. }
    18. }
    19. } finally {
    20. if (failed)
    21. cancelAcquire(node);
    22. }

    } }

``` 区别在于不可打断模式是将打断标记置为true,再次进入for循环(在AQS队列中);可打断模式直接抛出异常,不会进入for循环。