image.png

非公平锁实现原理

ReentrantLock默认实现为非公平锁,所谓的公平和非公平主要是AQS的同步器使用不一样,观察ReentrantLock的构造方法

  1. public ReentrantLock(boolean fair) {
  2. sync = fair ? new FairSync() : new NonfairSync();
  3. }
  4. public ReentrantLock() {
  5. sync = new NonfairSync();
  6. }

NonfairSync继承自 AQS
image.png

加锁流程

没有竞争时
image.png
第一个竞争出现时
image.png

竞争线程执行流程

该流程需要配合源码以及AQS原理这篇文章一起看
Thread-1 执行了

  1. CAS尝试将 state由 0 改为 1,结果失败
  2. 执行AQSacquire()方法并进入 tryAcquire()逻辑,
  3. tryAcquire()调用非公平竞争锁方法尝试获取锁,如果此时锁被释放,则Thread-1通过CAS修改state为1,则获取锁成功;否则获取失败返回false,执行下一步
  4. 接下来进入 addWaiter()逻辑,构造 Node队列,将当前线程封装成Node并加入队尾并返回当前线程节点,初始加入时对列中没有头结点和尾结点,程序创建一个空的Nodeheadtail
  5. 当前线程已经进入等待队列尾部,调用acquireQueued方法【acquireQueued是一个自旋逻辑,每次自旋都有可能尝试获取锁,前提是当前节点是head的后继节点】,找到当前节点的前驱节点,判断前驱节点是否为head节点(本例中刚好是)
    1. 如果是head节点,再次调用tryAcquire(),尝试获取锁,如果此时前面的线程释放了锁,则获取锁成功,修改当前节点为head节点;
    2. 如果不是head节点,则执行shouldParkAfterFailedAcquire
  6. 执行到这里当前线程已经最多尝试3次获取锁了【第一次执行lock的时候,第二次执行acquire()->tryAcquire(),第三次执行acquireQueued()->tryAcquire()】,但是仍然未获取到锁,那么当前线程节点需要找到一个可以唤醒自己的前驱节点,即先获取到前驱节点,

    1. 如果前驱节点的状态为SIGNAL则返回true,因为前驱节点也在等待,并且负责唤醒后继线程
    2. 如果前驱节点放弃了或者取消了,那么一直查找到状态正常的一个节点,返回false,并进行acquireQueued的下一轮自旋
    3. 如果前驱节点状态正常,则修改前驱节点状态为SIGNAL,并继续进行acquireQueued的下一轮自旋

      1. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/22353188/1633420811061-2f741775-60bd-441c-823f-fec996b4265f.png#height=186&id=DEZsS&margin=%5Bobject%20Object%5D&name=image.png&originHeight=372&originWidth=825&originalType=binary&ratio=1&size=24035&status=done&style=none&width=413)
  7. 如果到此时还没有获取到锁,那么执行parkAndCheckInterrupt阻塞当前线程(阻塞前一定找到状态为SIGNAL的前驱节点),进入等待队列

    1. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/22353188/1633421005048-c370e220-67d1-46b7-9c63-a55fe4f6a1e3.png#height=193&id=e3npC&margin=%5Bobject%20Object%5D&name=image.png&originHeight=385&originWidth=824&originalType=binary&ratio=1&size=25882&status=done&style=none&width=412)
  • 出现竞争时的队列状态
    • 图中红色三角表示该 NodewaitStatus状态,其中 0 为默认正常状态
    • Node 的创建是懒惰的
    • 其中第一个 Node称为 Dummy(哑元)或哨兵,用来占位,并不关联线程

image.png

  • 如果此时又有其他线程过来竞争资源则队列状态变为

image.png

源码实现

  1. final void lock() {
  2. //尝试使用cas修改state,有竞争的时候会失败
  3. if (compareAndSetState(0, 1))
  4. setExclusiveOwnerThread(Thread.currentThread());
  5. else
  6. //竞争失败,走AQS的获取锁方法
  7. acquire(1);
  8. }
  9. //这里是AQS 继承过来的方法
  10. public final void acquire(int arg) {
  11. //尝试一次tryAcquire,如果失败了执行addWaiter
  12. if (!tryAcquire(arg) &&
  13. //将线程加入队尾,并寻找前驱唤醒节点
  14. acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
  15. //自我中断
  16. selfInterrupt();
  17. }
  18. }
  19. //这里是NonfairSync本身的方法
  20. protected final boolean tryAcquire(int acquires) {
  21. //调用非公平竞争
  22. return nonfairTryAcquire(acquires);
  23. }
  24. //这里是非公平竞争实现
  25. final boolean nonfairTryAcquire(int acquires) {
  26. final Thread current = Thread.currentThread();
  27. int c = getState();
  28. //如果此时竞态条件被释放了
  29. if (c == 0) {
  30. //cas修改state状态
  31. if (compareAndSetState(0, acquires)) {
  32. //设置线程属主
  33. setExclusiveOwnerThread(current);
  34. return true;
  35. }
  36. }
  37. // 如果竞态条件依然存在,判断是不是锁重入
  38. else if (current == getExclusiveOwnerThread()) {
  39. int nextc = c + acquires;
  40. if (nextc < 0) // overflow
  41. throw new Error("Maximum lock count exceeded");
  42. setState(nextc);
  43. return true;
  44. }
  45. //获取锁失败
  46. return false;
  47. }

解锁流程

Thread-0释放锁,进入 release流程,如果成功

  1. 设置 exclusiveOwnerThreadnull
  2. 设置state= 0

**Thread-0**释放锁后的状态
image.png

线程释放锁执行流程

  1. Thread-0释放锁,进入 release流程
  2. 执行Sync的tryRelease流程,正常来说,tryRelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了
    1. tryRelease执行成功后修改exclusiveOwnerThreadnull
    2. 修改state = 0,如果Thread-0 进行了锁重入,则state本次修改不为0,只是将锁重入次数减一

image.png

  1. tryRelease执行成功返回true 执行AQSunparkSuccessor流程,开始唤醒后继线程
    1. 将当前线程(Thread-0)的waitState状态修改为0

image.png
b. 找到当前线程的后继线程,并判断后继线程的waitState状态,如果后继线程为null,或者取消了,那么就从队尾往前找,找到后继线程
image.png
c. 唤醒找到的线程

源码实现

  1. //Sync的unlock
  2. public void unlock() {
  3. sync.release(1);
  4. }
  5. //Sync 继承自 AQS
  6. public final boolean release(int arg) {
  7. if (tryRelease(arg)) {
  8. Node h = head;
  9. if (h != null && h.waitStatus != 0)
  10. //唤醒后继节点
  11. unparkSuccessor(h);
  12. return true;
  13. }
  14. return false;
  15. }
  16. //Sync的tryRelease
  17. protected final boolean tryRelease(int releases) {
  18. int c = getState() - releases;
  19. //保险起见做的检验
  20. if (Thread.currentThread() != getExclusiveOwnerThread())
  21. throw new IllegalMonitorStateException();
  22. boolean free = false;
  23. if (c == 0) {
  24. free = true;
  25. setExclusiveOwnerThread(null);
  26. }
  27. setState(c);
  28. return free;
  29. }
  30. // AQS的实现
  31. private void unparkSuccessor(Node node) {
  32. //node为当前线程所在的结点。
  33. int ws = node.waitStatus;
  34. if (ws < 0){
  35. //置零当前线程所在的结点状态,允许失败。
  36. compareAndSetWaitStatus(node, ws, 0);
  37. }
  38. //找到下一个需要唤醒的结点s
  39. Node s = node.next;
  40. if (s == null || s.waitStatus > 0) {
  41. //如果为空或已取消
  42. s = null;
  43. for (Node t = tail; t != null && t != node; t = t.prev)
  44. // 当前节点的后继节点被取消了或者为空,则应该从后向前找状态正常的
  45. if (t.waitStatus <= 0)
  46. s = t;
  47. }
  48. if (s != null)
  49. //唤醒
  50. LockSupport.unpark(s.thread);
  51. }

可重入原理

重点在加锁时的else if 发现锁的持有者还是当前线程,就对状态进行累加,以及解锁时的getState() - releases直到state=0才算解锁成功

  1. final boolean nonfairTryAcquire(int acquires) {
  2. final Thread current = Thread.currentThread();
  3. int c = getState();
  4. if (c == 0) {
  5. if (compareAndSetState(0, acquires)) {
  6. setExclusiveOwnerThread(current);
  7. return true;
  8. }
  9. }
  10. // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
  11. else if (current == getExclusiveOwnerThread()) {
  12. // state++
  13. int nextc = c + acquires;
  14. if (nextc < 0) // overflow
  15. throw new Error("Maximum lock count exceeded");
  16. setState(nextc);
  17. return true;
  18. }
  19. return false;
  20. }
  21. //Sync的tryRelease
  22. protected final boolean tryRelease(int releases) {
  23. // state--
  24. int c = getState() - releases;
  25. //保险起见做的检验
  26. if (Thread.currentThread() != getExclusiveOwnerThread())
  27. throw new IllegalMonitorStateException();
  28. boolean free = false;
  29. // 支持锁重入, 只有 state 减为 0, 才释放成功
  30. if (c == 0) {
  31. free = true;
  32. setExclusiveOwnerThread(null);
  33. }
  34. setState(c);
  35. return free;
  36. }

可打断原理

可打断模式

  1. 如果没获取到锁,则进入doAcquireInterruptibly流程
  2. doAcquireInterruptibly流程与正常获取锁的流程(acquireQueued)几乎一致,不同的地方在于,acquireQueued在捕获到中断时,只是记录中断标记,然后继续自旋;而doAcquireInterruptibly捕获到中断直接抛出中断异常,不会再进行下一轮循环。

    1. //ReentrantLock#lockInterruptibly
    2. public void lockInterruptibly() throws InterruptedException {
    3. sync.acquireInterruptibly(1);
    4. }
    5. //继承自父类的方法
    6. public final void acquireInterruptibly(int arg)
    7. throws InterruptedException {
    8. if (Thread.interrupted())
    9. throw new InterruptedException();
    10. //如果没获取到锁
    11. if (!tryAcquire(arg))
    12. doAcquireInterruptibly(arg);
    13. }
    14. //继承自父类的方法
    15. private void doAcquireInterruptibly(int arg)throws InterruptedException {
    16. //当前线程封装成节点,并加入队尾
    17. final Node node = addWaiter(Node.EXCLUSIVE);
    18. boolean failed = true;
    19. try {
    20. for (;;) {
    21. final Node p = node.predecessor();
    22. if (p == head && tryAcquire(arg)) {
    23. setHead(node);
    24. p.next = null; // help GC
    25. failed = false;
    26. return;
    27. }
    28. if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
    29. // 在 park 过程中如果被 interrupt 会进入此, 这时候抛出异常, 而不会再次进入自旋,与不可打断相比,只是在此抛出异常
    30. throw new InterruptedException();
    31. }
    32. } finally {
    33. if (failed)
    34. cancelAcquire(node);
    35. }
    36. }

    不可打断模式

    在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了 ,不可打断模式的原理在上节已经说明,即acquireQueued在捕获到中断时,只是记录中断标记,然后继续自旋

    1. final boolean acquireQueued(final Node node, int arg) {
    2. //标记是否成功拿到资源
    3. boolean failed = true;
    4. try {
    5. //标记等待过程中是否被中断过
    6. boolean interrupted = false;
    7. //CAS自旋
    8. for (; ; ) {
    9. //拿到前驱节点
    10. final Node p = node.predecessor();
    11. //如果前驱是head,即该结点已成第二个节点,那么便有资格去尝试获取资源
    12. if (p == head && tryAcquire(arg)) {
    13. //拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。
    14. setHead(node);
    15. // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
    16. p.next = null;
    17. // 成功获取资源
    18. failed = false;
    19. //返回等待过程中是否被中断过
    20. return interrupted;
    21. }
    22. //如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
    23. if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
    24. //如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
    25. interrupted = true;
    26. }
    27. } finally {
    28. // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
    29. if (failed)
    30. cancelAcquire(node);
    31. }
    32. }

    公平锁实现原理

    公平锁与非公平锁的区别就是在尝试获取锁的逻辑上,即在争抢锁的时候会调用hasQueuedPredecessors方法,该方法的作用就是查看是否有比当前线程等待时间更久的线程存在,如果有就不争抢锁,如果没有就竞争锁 ```java static final class FairSync extends Sync {

    1. private static final long serialVersionUID = -3000897897090466540L;
    2. final void lock() {
    3. acquire(1);
    4. }
    5. protected final boolean tryAcquire(int acquires) {
    6. final Thread current = Thread.currentThread();
    7. int c = getState();
    8. if (c == 0) {
    9. //主要区别处,在尝试获取锁之前,查看是否有比当前线程等待时间更久的线程
    10. //或者说查看当前线程是否有前驱线程(正常状态)
    11. if (!hasQueuedPredecessors() &&
    12. compareAndSetState(0, acquires)) {
    13. setExclusiveOwnerThread(current);
    14. return true;
    15. }
    16. }
    17. else if (current == getExclusiveOwnerThread()) {
    18. int nextc = c + acquires;
    19. if (nextc < 0)
    20. throw new Error("Maximum lock count exceeded");
    21. setState(nextc);
    22. return true;
    23. }
    24. return false;
    25. }

    }

    public final boolean hasQueuedPredecessors() { //获取尾结点

    1. Node t = tail;

    //获取头结点

    1. Node h = head;
    2. Node s;
    3. return
    4. //h!=t说明还有节点存在
    5. h != t &&
    6. //查看头结点后有没有节点
    7. ((s = h.next) == null ||
    8. //头结点的后继节点不为空,且头结点的后继线程不是当前线程
    9. s.thread != Thread.currentThread());

    }

  1. <a name="ferQP"></a>
  2. # 条件变量实现原理
  3. 每个条件变量其实就对应着一个等待队列,其实现类是 `ConditionObject`
  4. <a name="fL2sG"></a>
  5. ## await 流程
  6. ```java
  7. public final void await() throws InterruptedException {
  8. if (Thread.interrupted())
  9. throw new InterruptedException();
  10. Node node = addConditionWaiter();
  11. int savedState = fullyRelease(node);
  12. int interruptMode = 0;
  13. while (!isOnSyncQueue(node)) {
  14. LockSupport.park(this);
  15. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  16. break;
  17. }
  18. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  19. interruptMode = REINTERRUPT;
  20. if (node.nextWaiter != null) // clean up if cancelled
  21. unlinkCancelledWaiters();
  22. if (interruptMode != 0)
  23. reportInterruptAfterWait(interruptMode);
  24. }

开始 Thread-0 持有锁,调用 await,进入 ConditionObjectaddConditionWaiter流程,创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部

  1. 进入addConditionWaiter流程,添加一个 Node至等待队列

    1. private Node addConditionWaiter() {
    2. //最后一个等待节点
    3. Node t = lastWaiter;
    4. if (t != null && t.waitStatus != Node.CONDITION) {
    5. // 所有已取消的 Node 从队列链表删除,并更新lastWaiter
    6. unlinkCancelledWaiters();
    7. t = lastWaiter;
    8. }
    9. // 创建一个关联当前线程的新 Node, 添加至队列尾部
    10. Node node = new Node(Thread.currentThread(), Node.CONDITION);
    11. if (t == null)
    12. firstWaiter = node;
    13. else
    14. t.nextWaiter = node;
    15. lastWaiter = node;
    16. return node;
    17. }

    image.png

  2. 进入 AQS 的 fullyRelease流程,释放同步器上的锁,可能存在锁重入,需要将 state 全部释放

    1. final int fullyRelease(Node node) {
    2. boolean failed = true;
    3. try {
    4. int savedState = getState();
    5. //把所有的锁重入值全部释放了
    6. if (release(savedState)) {
    7. failed = false;
    8. return savedState;
    9. } else {
    10. throw new IllegalMonitorStateException();
    11. }
    12. } finally {
    13. if (failed)
    14. node.waitStatus = Node.CANCELLED;
    15. }
    16. }

    image.png

  3. 进入isOnSyncQueue逻辑,判断node节点是否在同步对列中,

    1. 如果不在同步队列,则unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1竞争成功

image.png
b. 判断此次线程的唤醒是否因为线程被中断, 若是被中断,转移节点到同步对列中

signal 流程

  1. //唤醒 - 必须持有锁才能唤醒, 因此 doSignal 内无需考虑加锁
  2. public final void signal() {
  3. if (!isHeldExclusively())
  4. throw new IllegalMonitorStateException();
  5. Node first = firstWaiter;
  6. if (first != null)
  7. doSignal(first);
  8. }
  9. //唤醒 - 将没取消的第一个节点转移至 AQS 队列
  10. private void doSignal(Node first) {
  11. do {
  12. if ( (firstWaiter = first.nextWaiter) == null)
  13. lastWaiter = null;
  14. first.nextWaiter = null;
  15. } while (
  16. // 将等待队列中的 Node 转移至 AQS 队列, 不成功且还有节点则继续循环
  17. !transferForSignal(first) &&
  18. // 队列还有节点
  19. (first = firstWaiter) != null);
  20. }

假设 Thread-1要来唤醒 Thread-0
进入 ConditionObjectdoSignal流程,取得等待队列中第一个 Node,即Thread-0 所在 Node``
image.png
执行 transferForSignal流程,将该 Node加入 AQS队列尾部,将``Thread-0waitStatus改为 0,Thread-3waitStatus改为 -1
image.png
Thread-1 释放锁,进入 unlock流程