入口

  1. public static void main(String[] args) {
  2. ReentrantLock lock =new ReentrantLock();
  3. lock.lock();
  4. System.out.println("Test.main");
  5. lock.unlock();
  6. }
  1. public ReentrantLock() {
  2. sync = new NonfairSync();
  3. }

ReentrantLock 无参构造器默认使用非公平同步器 NonfairSync 进行锁的相关操作, NonfairSyncReentrantLock 的静态内部类,继承自抽象静态内部类 Sync ,而Sync又继承 AbstractQueuedSynchronizer ,
这个类是java并发包中关于Lock,CountdownLatch,semaphore等实现的基础。

  1. static final class NonfairSync extends Sync
  2. abstract static class Sync extends AbstractQueuedSynchronizer

下面看下ReentrantLock的lock方法的实现

  1. //ReenTrantLock
  2. public void lock() {
  3. sync.lock(); //委托给syn,调用sync的lock方法,默认sync = NonfairSync()
  4. }
  5. //下面是非公平和公平两种模式获取锁的异同
  6. // NonfairSync 一上来就直接去cas抢锁,抢到了,就设置排他模式,
  7. // 抢不到再调acquire(1),跟FairSyn一样
  8. //NonfairSync
  9. final void lock() {
  10. if (compareAndSetState(0, 1))
  11. setExclusiveOwnerThread(Thread.currentThread());
  12. else
  13. acquire(1);
  14. }
  15. //FairSyn
  16. final void lock() {
  17. acquire(1);
  18. }
  1. 使用 compareAndSetState 方法用CAS去设置状态,当获取锁成功后,调用 setExclusiveOwnerThread 方式设置当前线程独占此锁

    下面看下 compareAndSetState 方法,这个比较简单,就是用UnSafe类型做原子的比较替换操作,这个方法是 属于AbstractQueuedSynchronizer

    1. // AbstractQueuedSynchronizer
    2. protected final boolean compareAndSetState(int expect, int update) {
    3. // See below for intrinsics setup to support this
    4. return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    5. }
  2. 当设置状态失败,表明有其他线程在争抢此锁,并且争抢成功了,此时将走else分支,调用 acquire(1) 方法

    下面看acquire()方法的代码,这个方法中的操作是AQS的重点代码

    aqs.acquire(int arg)

    以独占(排他)的模式去尝试获取锁,如果成功获取锁,则返回;获取失败,则会线程阻塞,进入等待锁的队列,等待获取到锁的线程释放锁后通知

    1. //以独占的方式取
    2. public final void acquire(int arg) {
    3. //tryAcquire是个钩子方法,由子类去实现 ,
    4. //NonfairSync会尝试获取锁,FairSync看有前置节点没,有排队
    5. if (!tryAcquire(arg) &&
    6. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    7. selfInterrupt();
    8. }

    tryAcquire(int arg)

    接下来看一下ReentrantLock中静态类Syn默认子类NonFairSyn的实现方式

    1. //NonfairSync
    2. protected final boolean tryAcquire(int acquires) {
    3. //调用父类Sync中的nonfairTryAcquire方法获取锁
    4. return nonfairTryAcquire(acquires);
    5. }
    1. //Sync
    2. final boolean nonfairTryAcquire(int acquires) {
    3. final Thread current = Thread.currentThread();
    4. //获取父类AQS的state属性值,它是volatile修饰,这个属性是所有同步器的核心,就是由它控制锁
    5. //排他锁,共享锁,信号量等实现全部靠它
    6. int c = getState();
    7. //stat == 0 标识没有线程获取到锁
    8. if (c == 0) {
    9. //cas去获取锁,获取成功后,设置当前线程独占
    10. if (compareAndSetState(0, acquires)) {
    11. setExclusiveOwnerThread(current);
    12. return true;
    13. }
    14. }
    15. //stat != 0 标识有线程已经获取到锁了,然后获取霸占锁的线程是否是当前线程
    16. else if (current == getExclusiveOwnerThread()) {
    17. //如果是,则stat加1,同一个线程,可以多次获取锁
    18. int nextc = c + acquires;
    19. if (nextc < 0) // overflow
    20. throw new Error("Maximum lock count exceeded");
    21. setState(nextc);
    22. return true;
    23. }
    24. //如果是其他线程则返回false
    25. return false;
    26. }

    此时 acquire(int arg) 的组合条件第一项tryAcquire(int arg)尝试获取一次锁执行完毕,如果成功获取到锁,就已经返回了,假定没有获取到锁,看下 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 中的逻辑

    addWaiter(Node.EXCLUSIVE)

    此方法就是把线程封装成一个Node实例,往等待锁的队列中添加此实例对象(当前线程)

    1. private Node addWaiter(Node mode) {
    2. //创建一个新的Node节点,并传递当前线程以及独占模式两个参数
    3. Node node = new Node(Thread.currentThread(), mode);
    4. // Try the fast path of enq; backup to full enq on failure
    5. Node pred = tail;
    6. if (pred != null) {
    7. //如果等待锁的队列队尾不为空,则设置当前节点的前置节点为原队尾节点
    8. node.prev = pred;
    9. //cas方式设置刚创建的节点为队尾节点(入队),如果入队成功,直接返回
    10. if (compareAndSetTail(pred, node)) {
    11. pred.next = node;
    12. return node;
    13. }
    14. }
    15. //如果等待锁队列为空或者cas设置队尾失败,会调用enq()入队方法
    16. enq(node);
    17. return node;
    18. }

    enq(Node node)入队

    1. private Node enq(final Node node) {
    2. //死循环直到入队成功
    3. for (;;) {
    4. Node t = tail;
    5. //队列为空,需要初始化队列
    6. if (t == null) { // Must initialize
    7. //注意:初始化的队列首位Node节点是一个空节点,它没有封装任何线程
    8. if (compareAndSetHead(new Node()))
    9. tail = head;
    10. } else {
    11. //CAS直到入队
    12. node.prev = t;
    13. if (compareAndSetTail(t, node)) {
    14. t.next = node;
    15. return t;
    16. }
    17. }
    18. }
    19. }

    11345047-4914077e2ff76765.webp

    acquireQueued(Node node,int arg)

    此方法用于

    1. final boolean acquireQueued(final Node node, int arg) {
    2. boolean failed = true;
    3. try {
    4. boolean interrupted = false;
    5. for (;;) {
    6. //获取到前一个节点p
    7. final Node p = node.predecessor();
    8. //如果前置节点是头节点,则尝试获取锁,因为头节点可能是初始化的空节点
    9. if (p == head && tryAcquire(arg)) {
    10. //获取锁成功后,将当前节点设置为头节点,被将node的thread,prev设置为空
    11. //即头节点需是空节点,也就是持有锁的节点
    12. setHead(node);
    13. //原来的头节点的next节点设置null,因为头节点已经换成抢到锁的node节点了
    14. //原来的头节点p就是垃圾了,等待gc回收
    15. p.next = null; // help GC
    16. failed = false;
    17. //获取锁成功,那么当前线程就不需要中断阻塞了
    18. return interrupted;
    19. }
    20. //如果前一个节点不是头节点或者获取锁失败,调用shouldParkAfterFailedAcquirefangfa
    21. //设置前置节点为可唤醒状态(waitStatus=Node.SIGNAL),并剔除前置为waitStatus==Node
    22. //.CANCAELLED的节点
    23. if (shouldParkAfterFailedAcquire(p, node) &&
    24. //阻塞当前线程
    25. //当线程被唤醒后,如果线程是被打断的,则设置interrutted为true
    26. //又开始for循环去获取锁,根据if条件,所以只能是第二节点(pre==head)才可能调用
    27. //tryAcquire()获取锁,所以,jvm唤醒线程应该是唤醒队首的线程
    28. parkAndCheckInterrupt())
    29. interrupted = true;
    30. }
    31. } finally {
    32. if (failed)
    33. //这只可能异常的情况下才会执行到,获取到锁failed = true
    34. //没有获取锁,则线程阻塞,所以只可能没有获取到锁,但是过程异常了
    35. cancelAcquire(node);
    36. }
    37. }

    shouldParkAfterFailedAcquire(Node pre,Node node)

    此方法的作用是设置当前节点的前置节点的waitStatus为Node.SIGNAL状态,因为只有持有锁的线程释放锁后,通知等待锁的线程抢锁,被通知到的线程对应的Node的waitStatus必须为<=0,

  • SIGNAL(-1):标识有后继节点马上进入阻塞,需要当前节点唤醒它,它标识的不是本节点的状态,而是后继节点的状态
  • CANCELLED(1):当线程阻塞的timeout到期,或者线程被interrupt后,节点的状态变更为CANCELLED,当线程是此状态后,当再次调用阻塞API阻塞此线程时,此线程不会被阻塞
  • CONDITION(-2):标识当前线程处于一个条件队列(condition queue)中,条件队列中的节点(线程)在转移到获取锁的同步队列前不会唤醒,当它转移到同步队列中等待锁时,waitStatus从-2变更为0
  • PROPAGATE(-3):共享锁实现时用到,当共享锁释放锁,用来设置head节点,然后传递给其他节点
    1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    2. // 检查前置节点的waitStatus状态
    3. int ws = pred.waitStatus;
    4. //如果状态waitStatus是-1 标识前置节点的waitStatus已经被设置为需要唤醒,也就是
    5. //锁持有者释放锁后,需要唤醒一个等待锁的线程,这个被唤醒的线程的waitStatus需要是<=0
    6. //如果有后置节点进来,前置节点的waitstatus==-1 ,否则一直是0
    7. if (ws == Node.SIGNAL)
    8. /*
    9. * This node has already set status asking a release
    10. * to signal it, so it can safely park.
    11. */
    12. return true;
    13. //waitStatus == 1(Node.CANCELLED)标识此线程取消抢锁
    14. if (ws > 0) {
    15. /*
    16. * Predecessor was cancelled. Skip over predecessors and
    17. * indicate retry.
    18. */
    19. do {
    20. //将队列前置节点waitStatus == 1的Node节点剔除队列
    21. node.prev = pred = pred.prev;
    22. } while (pred.waitStatus > 0);
    23. pred.next = node;
    24. } else {
    25. /*
    26. * waitStatus must be 0 or PROPAGATE. Indicate that we
    27. * need a signal, but don't park yet. Caller will need to
    28. * retry to make sure it cannot acquire before parking.
    29. */
    30. //将前置节点设置为Node.SIGNAL,设置成功后还需要调用者再调一次确保线程被阻塞前没有成为对首
    31. //因为队首节点可以抢锁了,避免不必要的阻塞线程
    32. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    33. }
    34. return false;
    35. }

    parkAndCheckInterrupt()

    这个方法很简单,就是阻塞当前线程
    1. private final boolean parkAndCheckInterrupt() {
    2. //用到了LockSupport工具类,将线程阻塞
    3. //此线程要想唤醒,除非1,有其他线程调用LockSupport.unPark()方法,2 有线程将当前线程中断,
    4. //3,虚假唤醒
    5. LockSupport.park(this);
    6. return Thread.interrupted();
    7. }

    cancelAcquire(Node node)

  1. private void cancelAcquire(Node node) {
  2. // Ignore if node doesn't exist
  3. if (node == null)
  4. return;
  5. //设置node节点的线程为空
  6. node.thread = null;
  7. // Skip cancelled predecessors
  8. //取消的node清除出队列
  9. Node pred = node.prev;
  10. while (pred.waitStatus > 0)
  11. node.prev = pred = pred.prev;
  12. // predNext is the apparent node to unsplice. CASes below will
  13. // fail if not, in which case, we lost race vs another cancel
  14. // or signal, so no further action is necessary.
  15. Node predNext = pred.next;
  16. // Can use unconditional write instead of CAS here.
  17. // After this atomic step, other Nodes can skip past us.
  18. // Before, we are free of interference from other threads.
  19. //设置node的waitstatus为取消状态
  20. node.waitStatus = Node.CANCELLED;
  21. // If we are the tail, remove ourselves.
  22. //如果是队尾元素,则移出队列
  23. if (node == tail && compareAndSetTail(node, pred)) {
  24. compareAndSetNext(pred, predNext, null);
  25. } else {
  26. // If successor needs signal, try to set pred's next-link
  27. // so it will get one. Otherwise wake it up to propagate.
  28. //如果不是队尾元素,并且不是队首节点,且前置节点为SIGNAL(如果不是,设置为SIGNAL)
  29. //然后将前后节点连接起来
  30. int ws;
  31. if (pred != head &&
  32. ((ws = pred.waitStatus) == Node.SIGNAL ||
  33. (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
  34. pred.thread != null) {
  35. Node next = node.next;
  36. if (next != null && next.waitStatus <= 0)
  37. compareAndSetNext(pred, predNext, next);
  38. } else {//如果是头节点,负责唤醒下一个节点
  39. unparkSuccessor(node);
  40. }
  41. //非队尾的node取消后,设置后继节点为自己
  42. node.next = node; // help GC
  43. }
  44. }

unparkSuccessor(Node node)

唤醒后续等待队列中队首的线程(如果没取消的话Node.waitStatus == Node.CANCENLLED),如果队首节点取消的话,则从队尾遍历最靠前的没有取消的线程,唤醒它。
注(细节):
如果后继节点更新完head节点的waitStatus为SIGNAL,然后再一次tryAcquire(1)没有获取到锁,然后在挂起线程前head节点释放锁,然后调用此方法唤醒后继节点,那就是LockSupport.unpark(t)先于LockSupport.park(this)执行,那么后继节点不会再阻塞
这个方法也是lock.release(1)最终调用的方法

  1. //通过上文分析可知node节点为head节点
  2. private void unparkSuccessor(Node node) {
  3. /*
  4. * If status is negative (i.e., possibly needing signal) try
  5. * to clear in anticipation of signalling. It is OK if this
  6. * fails or if status is changed by waiting thread.
  7. */
  8. int ws = node.waitStatus;
  9. if (ws < 0)
  10. //如果ws < 0 ,把head节点的waitststus更新为0
  11. compareAndSetWaitStatus(node, ws, 0);
  12. /*
  13. * Thread to unpark is held in successor, which is normally
  14. * just the next node. But if cancelled or apparently null,
  15. * traverse backwards from tail to find the actual
  16. * non-cancelled successor.
  17. */
  18. Node s = node.next;
  19. //如果下一个节点为空,或者处于取消状态,则从后往前遍历
  20. //找到最前面的等待唤醒的线程
  21. //为啥从后遍历??
  22. //cancelAcquire()方法中当取消的node为不是tail时,为了help gc 设置了node.next = node
  23. //所以遍历时不能使用next引用,得用prev引用,所以就得从后往前遍历
  24. if (s == null || s.waitStatus > 0) {
  25. s = null;
  26. for (Node t = tail; t != null && t != node; t = t.prev)
  27. if (t.waitStatus <= 0)
  28. s = t;
  29. }
  30. //将之唤醒
  31. if (s != null)
  32. LockSupport.unpark(s.thread);
  33. }

在并发环境下,加锁和解锁需要以下三个部件的协调:

  1. 锁状态。我们要知道锁是不是被别的线程占有了,这个就是 state 的作用,它为 0 的时候代表没有线程占有锁,可以去争抢这个锁,用 CAS 将 state 设为 1,如果 CAS 成功,说明抢到了锁,这样其他线程就抢不到了,如果锁重入的话,state进行 +1 就可以,解锁就是减 1,直到 state 又变为 0,代表释放锁,所以 lock() 和 unlock() 必须要配对啊。然后唤醒等待队列中的第一个线程,让其来占有锁。
  2. 线程的阻塞和解除阻塞。AQS 中采用了 LockSupport.park(thread) 来挂起线程,用 unpark 来唤醒线程
  3. 阻塞队列。因为争抢锁的线程可能很多,但是只能有一个线程拿到锁,其他的线程都必须等待,这个时候就需要一个 queue 来管理这些线程,AQS 用的是一个 FIFO 的队列,就是一个链表,每个 node 都持有后继节点的引用

入队

线程1,调用lock(),获取到锁,此时队列尚未初始化,head和tail都还是null
11345047-eb20a77145490d51.png
线程2调用lock(),tryAcquire(1)失败,addWaiter(Node.EXCLUSIVE),接着调用enq(final Node node) 初始化队列
并将当前线程入队 ,见enq(Node node),接着在shouldParkAfterFailedAcquire方法中更新head.waitStatus=-1
11345047-4bca976662aa851d.webp
线程3再进来
11345047-50dac2c8b1f52f7f.webp

带有过期时间的lock()

tryLock(long timeout, TimeUnit unit)

  1. //ReenTrantLock
  2. public boolean tryLock(long timeout, TimeUnit unit)
  3. throws InterruptedException {
  4. return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  5. }
  6. AQS
  7. public final boolean tryAcquireNanos(int arg, long nanosTimeout)
  8. throws InterruptedException {
  9. if (Thread.interrupted())
  10. throw new InterruptedException();
  11. //上来就先获取锁,获取不到才走doAcquireNanos
  12. return tryAcquire(arg) ||
  13. doAcquireNanos(arg, nanosTimeout);
  14. }
  15. //AQS
  16. private boolean doAcquireNanos(int arg, long nanosTimeout)
  17. throws InterruptedException {
  18. if (nanosTimeout <= 0L)
  19. return false;
  20. final long deadline = System.nanoTime() + nanosTimeout;
  21. final Node node = addWaiter(Node.EXCLUSIVE);
  22. boolean failed = true;
  23. try {
  24. for (;;) {
  25. final Node p = node.predecessor();
  26. if (p == head && tryAcquire(arg)) {
  27. //setHead操作:
  28. // head = node;node.thread = null;node.prev = null;
  29. //当node获取锁时将自身设置head节点,此时Node的属性next还没设置为null?
  30. // 因为释放锁的时候还需要next引用找到下一个需要唤醒的节点
  31. setHead(node);
  32. p.next = null; // help GC 新的head节点接棒后,旧head最后一个next引用置空
  33. failed = false;
  34. return true;
  35. }
  36. nanosTimeout = deadline - System.nanoTime();
  37. if (nanosTimeout <= 0L)
  38. return false;
  39. if (shouldParkAfterFailedAcquire(p, node) &&
  40. nanosTimeout > spinForTimeoutThreshold)
  41. //区别是阻塞一定时间,被唤醒有两种方式,1,时间到,2时间未到,head节点唤醒
  42. LockSupport.parkNanos(this, nanosTimeout);
  43. if (Thread.interrupted())
  44. throw new InterruptedException();
  45. }
  46. } finally {
  47. if (failed)
  48. cancelAcquire(node);
  49. }
  50. }

可以打断的lockInterruptily()

  1. //前面过程都差不多,也是上来就tryAcquir(1),失败就走下面的逻辑
  2. private void doAcquireInterruptibly(int arg)
  3. throws InterruptedException {
  4. //加入队列
  5. final Node node = addWaiter(Node.EXCLUSIVE);
  6. boolean failed = true;
  7. try {
  8. for (;;) {
  9. final Node p = node.predecessor();
  10. if (p == head && tryAcquire(arg)) {
  11. setHead(node);
  12. p.next = null; // help GC
  13. failed = false;
  14. return;
  15. }
  16. if (shouldParkAfterFailedAcquire(p, node) &&
  17. parkAndCheckInterrupt())
  18. //打断后抛异常
  19. throw new InterruptedException();
  20. }
  21. } finally {
  22. if (failed)
  23. //如果抛异常然后走这,取消当前节点,设置当前节点CANCELLED,踢出队列或者唤醒后继节点
  24. cancelAcquire(node);
  25. }
  26. }

阻塞在condition上

  1. ////ReenTrantLock
  2. public Condition newCondition() {
  3. return sync.newCondition();
  4. }
  5. //ReenTrantLock.Sync
  6. final ConditionObject newCondition() {
  7. return new ConditionObject();
  8. }
  9. //AQS
  10. //从ConditionObject 可以看出AQS实现的Condition,内部同样维护了一个队列
  11. //当多个线程调用condition的await()方法后,都放入此等待队列中
  12. public class ConditionObject implements Condition, java.io.Serializable {
  13. private static final long serialVersionUID = 1173984872572414699L;
  14. /** First node of condition queue. */
  15. private transient Node firstWaiter;
  16. /** Last node of condition queue. */
  17. private transient Node lastWaiter;
  18. //其他代码...
  19. }

Condition.await()

  1. public final void await() throws InterruptedException {
  2. if (Thread.interrupted())
  3. throw new InterruptedException();
  4. //添加节点到条件队列
  5. Node node = addConditionWaiter();
  6. //释放全部的信号/锁,这里以ReentrantLock.newCondition()为例,
  7. //因为lock是可重入的,所以stat可能不止是1,所以全部释放
  8. //并将释放前state的值保存savedState中
  9. int savedState = fullyRelease(node);
  10. int interruptMode = 0;
  11. // 这里退出循环有两种情况,之后再仔细分析
  12. // 1. isOnSyncQueue(node) 返回 true,即当前 node 已经转移到阻塞队列了
  13. // 2. checkInterruptWhileWaiting(node) != 0 会到 break,然后退出循环,代表的是线程中断
  14. while (!isOnSyncQueue(node)) {
  15. LockSupport.park(this);
  16. //假设当前线程被signal()唤醒,继续执行,且没有被打断(打断细节暂不考虑)
  17. //又进入while循环判断,此时跳出循环
  18. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  19. break;
  20. }
  21. //进入获取锁,阻塞的死循环中了,获取锁,使用释放前锁持有的数量savedState
  22. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  23. interruptMode = REINTERRUPT;
  24. if (node.nextWaiter != null) // clean up if cancelled
  25. unlinkCancelledWaiters();
  26. if (interruptMode != 0)
  27. reportInterruptAfterWait(interruptMode);
  28. }
  29. private Node addConditionWaiter() {
  30. Node t = lastWaiter;
  31. // If lastWaiter is cancelled, clean out.
  32. if (t != null && t.waitStatus != Node.CONDITION) {
  33. // 如果conditon queue的最后一个节点不是CONDITION状态,则把非CONDITION状态的节点
  34. // 全部踢出condition队列
  35. unlinkCancelledWaiters();
  36. t = lastWaiter;
  37. }
  38. //创建新CONDITION节点,新创建的Node节点的pre和next都是null,即不在等待锁的同步队列
  39. //它本身也是持有锁后才走到这的
  40. Node node = new Node(Thread.currentThread(), Node.CONDITION);
  41. //为啥这里往队列加元素没有用cas?
  42. //因为此方法要想执行,首先得获取到condition附着的锁,所以不需要cas
  43. if (t == null)
  44. firstWaiter = node;//condition queue为空
  45. else
  46. t.nextWaiter = node; //放到队列尾部
  47. lastWaiter = node;
  48. return node;
  49. }
  50. //condition queue使用了Node节点的nextWaiter属性构成一个单向链表
  51. //踢出队列的动作就是将非CONDITION的的node的nextWaiter属性置空,将前一个CONDITION状态的node的
  52. //nextWaiter指向node的下一个节点(如果也需要踢出,则更换指向下下一个节点,依次类推)
  53. private void unlinkCancelledWaiters() {
  54. Node t = firstWaiter;
  55. //trail是遍历到最新到waitStatus==CONDITION(-2)的node,
  56. Node trail = null;
  57. //t初始为第一个节点,循环逻辑执行完毕后将t指向下一个节点,只要队列不为空,就一直循环
  58. while (t != null) {
  59. //把下一个节点先取出来
  60. Node next = t.nextWaiter;
  61. //比较当前节点t的waitStatus是否是-2
  62. if (t.waitStatus != Node.CONDITION) {
  63. //如果走到这,说明当前节点t不再需要等待condition条件,将他的nextWaiter置空
  64. t.nextWaiter = null;
  65. if (trail == null)
  66. //如果遍历到的节点都是需要踢出condition的队列的,会走到这,直到碰到第一个
  67. //真正需要等待condition的节点,将队列头引用firstWaiter指向它
  68. //这里是提前将队首引用指向下一个节点(当前节点肯定不是)
  69. firstWaiter = next;
  70. else
  71. trail.nextWaiter = next;
  72. if (next == null)
  73. //如果队列到尾了,就将队尾引用指向trail,它是最后一个等待的节点
  74. lastWaiter = trail;
  75. }
  76. else
  77. //走到这,说明当前节点是需要等待的节点,会赋给trail
  78. trail = t;
  79. //t当前节点 这里把下一个节点赋给当前节点,然后继续循环判断
  80. t = next;
  81. }
  82. }
  83. // 在节点入条件队列的时候,初始化时设置了 waitStatus = Node.CONDITION
  84. // 前面我提到,signal 的时候需要将节点从条件队列移到阻塞队列,
  85. // 这个方法就是判断 node 是否已经移动到阻塞队列了
  86. final boolean isOnSyncQueue(Node node) {
  87. // 移动过去的时候,node 的 waitStatus 会置为 0,这个之后在说 signal 方法的时候会说到
  88. // 如果 waitStatus 还是 Node.CONDITION,也就是 -2,那肯定就是还在条件队列中
  89. // 如果 node 的前驱 prev 指向还是 null,说明肯定没有在 阻塞队列(prev是阻塞队列链表中使用的)
  90. if (node.waitStatus == Node.CONDITION || node.prev == null)
  91. return false;
  92. // 如果 node 已经有后继节点 next 的时候,那肯定是在阻塞队列了
  93. if (node.next != null)
  94. return true;
  95. // 下面这个方法从阻塞队列的队尾开始从后往前遍历找,如果找到相等的,说明在阻塞队列,
  96. //否则就是不在阻塞队列
  97. // 可以通过判断 node.prev() != null 来推断出 node 在阻塞队列吗?答案是:不能。
  98. // AQS 的入队方法,首先设置的是 node.prev 指向 tail,
  99. // 然后是 CAS 操作将自己设置为新的 tail,可是这次的 CAS 是可能失败的。
  100. return findNodeFromTail(node);
  101. }

condition.signal()

  1. public final void signal() {
  2. //是否持有排他锁
  3. if (!isHeldExclusively())
  4. throw new IllegalMonitorStateException();
  5. Node first = firstWaiter;
  6. if (first != null)
  7. //唤醒condition队列的第一个节点
  8. doSignal(first);
  9. }
  10. private void doSignal(Node first) {
  11. do {
  12. //如果队列中还有等待的node,则把下一个节点设置为头节点,如果没有,则把尾引用置空
  13. if ( (firstWaiter = first.nextWaiter) == null)
  14. lastWaiter = null;
  15. //一上来就把队列头的节点给干掉了,因为它马上就被唤醒了,所以要踢出conndition的队列
  16. first.nextWaiter = null;
  17. } while (!transferForSignal(first) &&
  18. (first = firstWaiter) != null);//如果队列头唤醒失败,且队列中还有其他节点
  19. //则继续唤醒后继节点
  20. }
  21. final boolean transferForSignal(Node node) {
  22. /*
  23. * If cannot change waitStatus, the node has been cancelled.
  24. */
  25. //CAS设置waitStatus,设置失败说明节点取消了
  26. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  27. return false;
  28. /*
  29. * Splice onto queue and try to set waitStatus of predecessor to
  30. * indicate that thread is (probably) waiting. If cancelled or
  31. * attempt to set waitStatus fails, wake up to resync (in which
  32. * case the waitStatus can be transiently and harmlessly wrong).
  33. */
  34. //熟悉的代码来了,将节点放入获取锁的同步队列,返回前一个节点
  35. Node p = enq(node);
  36. int ws = p.waitStatus;
  37. //如果前一个节点的waitSatus > 0 就是CANCELLED 或者设置前置节点SIGNAL失败
  38. //则将当前节点唤醒
  39. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  40. LockSupport.unpark(node.thread);
  41. return true;
  42. }

11345047-fd5bb5902a440c85.webp
Condition总结:
1,在使用 condition 时,必须先持有相应的锁
2,ReentrantLock 实例可以通过调用多次 newCondition 产生多个Condition类型的 ConditionObject 的实例,每一个实例维护了一个自己的condition队列
3,调用condition的await()方法时,把当前线程包装成Node对象放入condition 队列,释放锁,等待其他线程signal()/signalAll()
4,当处于其他线程调用signal()/signalAll()唤醒 condition队列的node时,将其放入获取锁的同步队列
5,被唤醒的线程从阻塞处开始执行,尝试去抢锁,就进入抢不到锁,阻塞;抢到了锁,执行代码,释放锁,通知后置节点的逻辑了。

参考https://www.jianshu.com/p/89132109d49d