AQS源码分析 - 图1

结点状态waitStatus

这里我们说下Node。Node结点是对每一个等待获取资源的线程的封装,其包含了需要同步的线程本身及其等待状态,如是否被阻塞、是否等待唤醒、是否已经被取消等。变量waitStatus则表示当前Node结点的等待状态,共有5种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE、0。

  • CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
  • SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
  • CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
  • PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
  • 0:新结点入队时的默认状态。

注意,负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常

acquire(int)

函数流程如下:

  1. tryAcquire()尝试直接去获取资源,如果成功则直接返回(这里体现了非公平锁,每个线程获取锁时会尝试直接抢占加塞一次,而CLH队列中可能还有别的线程在等待);
  2. addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是lock()的语义,当然不仅仅只限于lock()。获取到资源后,线程就可以去执行其临界区代码了。下面是acquire()的源码:

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

tryAcquire(int)

尝试去获取独占资源,如果获取成功,则直接返回true,否则直接返回false。

  1. //具体实现交由具体的同步器根据业务需求去实现
  2. protected boolean tryAcquire(int arg) {
  3. throw new UnsupportedOperationException();
  4. }

addWaiter(Node)
  1. private Node addWaiter(Node mode) {
  2. Node node = new Node(mode);
  3. for (;;) { //自旋
  4. //获得尾结点
  5. Node oldTail = tail;
  6. if (oldTail != null) { //如果尾节点不为空说明现在队列中已有结点,直接放入到队尾
  7. node.setPrevRelaxed(oldTail);
  8. if (compareAndSetTail(oldTail, node)) { //通过cas将尾节点修改为node
  9. oldTail.next = node;
  10. return node;
  11. }
  12. } else {//如果尾节点为空说明队列中没有结点,需要初始化
  13. initializeSyncQueue();
  14. }
  15. }
  16. }

acquireQueued(Node,int)

流程:

  1. 结点进入队尾,检查状态,找到安全休息点
  2. 调用park进入waitting状态,等待unpark()或interrupt()唤醒自己
  3. 被唤醒后,查看是否可以获取资源,如果拿到,head指向当前结点,并返回从入队拿到号的整个过程中是否被中断过;如果没拿到,继续流程1.

通过tryAcquire()和addWaiter(),该线程获取资源失败,已经被放入到等待队列尾部了。此时==线程进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己拿到资源后,再去做其他事==。就如同医院排队拿号

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean interrupted = false; //标记是否被中断过
  3. try {
  4. //自旋!!
  5. for (;;) {
  6. //拿到前驱结点
  7. final Node p = node.predecessor();
  8. //如果前驱是head结点,即该结点是第二个结点并且能够获取到资源
  9. if (p == head && tryAcquire(arg)) {
  10. setHead(node);//将当前结点设置为头结点
  11. p.next = null; // help GC
  12. return interrupted;
  13. }
  14. //如果可以进入等待状态,则返回true,进入方法内部,该if条件里面的方法是去寻找它前面最近
  15. //的一个WaitStatus为Signal状态的结点,并清理掉那些状态为Cancle的结点和将其余状态的
  16. //结点的waitStatus修改为Signal。从而这里一般会进行两次判断,实现了自旋。
  17. if (shouldParkAfterFailedAcquire(p, node))
  18. interrupted |= parkAndCheckInterrupt();
  19. }
  20. } catch (Throwable t) {
  21. cancelAcquire(node);
  22. if (interrupted)
  23. selfInterrupt();
  24. throw t;
  25. }
  26. }

shouldParkAfterFailedAcquire(Node, Node)

此方法主要用于检查状态,看看自己是否真的可以去休息了(进入waiting状态,如果线程状态转换不熟,可以参考Thread详解),万一队列前边的线程都放弃了只是瞎站着,那也说不定,对吧!

  1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  2. //获取前驱结点的状态
  3. int ws = pred.waitStatus;
  4. if (ws == Node.SIGNAL) //前驱结点在释放时会通知当前结点,当前结点可以安全地进入等待状态
  5. /*
  6. * This node has already set status asking a release
  7. * to signal it, so it can safely park.
  8. */
  9. return true;
  10. if (ws > 0) {
  11. /*
  12. * Predecessor was cancelled. Skip over predecessors and
  13. * indicate retry.
  14. */
  15. do { //前驱结点被取消,此时需要移除这些结点
  16. node.prev = pred = pred.prev;
  17. } while (pred.waitStatus > 0);
  18. pred.next = node;
  19. } else {
  20. /*
  21. * waitStatus must be 0 or PROPAGATE. Indicate that we
  22. * need a signal, but don't park yet. Caller will need to
  23. * retry to make sure it cannot acquire before parking.
  24. */
  25. //如果前驱结点正常(必须为0或者PROPAGATE),表明我们需要信号,故将前驱的状态改为SIGNAL状
  26. //态
  27. pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
  28. }
  29. return false;
  30. }

parkAndCheckInterrupt()

如果线程找好安全休息点后,那就可以安心去休息了。此方法就是让线程去休息,真正进入等待状态。

  1. private final boolean parkAndCheckInterrupt() {
  2. LockSupport.park(this);
  3. return Thread.interrupted();
  4. }

park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:1)被unpark();2)被interrupt()。

acquire()小结

源码:

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

流程:

  1. 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
  2. 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
    AQS源码分析 - 图2

release(int)

release方法是独占模式下线程释放共享资源的顶层入口。他会释放指定量的资源,如果彻底释放了,它会唤醒等待队列里的其他线程来获取资源。值得注意的是:release()方法是根据tryRelease()方法的返回值来判断该线程是否已经完成资源释放了,同步器在设计tryRelease()的时候要明确这一点!!

  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) {
  3. Node h = head; //找到头结点
  4. if (h != null && h.waitStatus != 0)
  5. unparkSuccessor(h); //唤醒等待队列里的下一个线程
  6. return true;
  7. }
  8. return false;
  9. }

tryRelease(int)

跟tryAcquire()一样,这个方法是需要独占模式的自定义同步器去实现的。正常来说,tryReleae()方法都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题。但需要注意它的返回值,release()是根据tryRelease()的返回值来判断该线程时候已经完成资源释放的,所以在自定义同步器实现时,如果资源彻底释放(state=0),返回true,否则返回false。

  1. protected boolean tryRelease(int arg) {
  2. throw new UnsupportedOperationException();
  3. }

unparkSuccessor(Node)

此方法用于唤醒等待队列中最前边的那个未放弃的线程。下面是源码:

  1. private void unparkSuccessor(Node node) {
  2. int ws = node.waitStatus; //获取当前线程所在的结点的状态
  3. if (ws < 0)
  4. node.compareAndSetWaitStatus(ws, 0);
  5. Node s = node.next; //找到一个需要唤醒的结点s
  6. if (s == null || s.waitStatus > 0) { //如果为空或已取消
  7. s = null;
  8. //从后往前找到队列中第一个未放弃的线程,这里从后往前寻找主要是由于addWaiter方法造成的
  9. //在addWaiter方法中后继指向前驱的结点是由CAS操作保证线程安全的,而CAS操作之后
  10. //oldtail.next = node之前,可能会有其他线程进来,因此从后往前找可以保证一定能遍历所有结
  11. //点
  12. for (Node p = tail; p != node && p != null; p = p.prev)
  13. if (p.waitStatus <= 0)
  14. s = p;
  15. }
  16. if (s != null)
  17. LockSupport.unpark(s.thread);//唤醒
  18. }

和acquireQueued()联系起来,s被唤醒后,进入if(p==head&&tryAcquire(arg))的判断(即使p!=head也没关系,它会进入shouldParkAfterFailedAcquire()寻找一个安全点,这里既然s已经是等待队列中最前边的那个线程了,就会将s前边的失效结点全部给删除,下次自旋p==head就成立了)

release()小结

release()是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了共享资源,则它会唤醒等待队列里的其他线程

acquireShared()

流程:

  1. tryAcquireShared()尝试获取资源,成功则直接返回
  2. 失败则通过doAcquireShared()方法进入等待队列,直到获取资源成功为止

该方法是共享模式下线程获取共享资源的顶层入口,它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取成功,整个过程中断忽略,源码如下:

  1. public final void acquireShared(int arg) {
  2. if (tryAcquireShared(arg) < 0) //负值代表获取失败
  3. doAcquireShared(arg);
  4. }

doAcquireShared(int)

此方法用于将当前线程加入到等待队列尾部休息,直到其他线程释放资源唤醒自己,成功拿到资源后才返回,源码如下:

  1. private void doAcquireShared(int arg) {
  2. //加入等待队列队尾
  3. final Node node = addWaiter(Node.SHARED);
  4. //等待过程中是否出现过中断
  5. boolean interrupted = false;
  6. try {
  7. for (;;) {
  8. final Node p = node.predecessor();
  9. if (p == head) {
  10. int r = tryAcquireShared(arg); //尝试获取资源
  11. if (r >= 0) {
  12. setHeadAndPropagate(node, r); //将head指向自己,还有剩余资源可以再唤醒之后的线程
  13. p.next = null; // help GC
  14. return;
  15. }
  16. }
  17. if (shouldParkAfterFailedAcquire(p, node)) //将当前线程连接到队列中最后一个处于有效等待状态的线程的后面
  18. interrupted |= parkAndCheckInterrupt();
  19. }
  20. } catch (Throwable t) {
  21. cancelAcquire(node);
  22. throw t;
  23. } finally {
  24. //补中断
  25. if (interrupted)
  26. selfInterrupt();
  27. }
  28. }

setHeadAndPropagate(Node, int)

源码如下:

  1. private void setHeadAndPropagate(Node node, int propagate) {
  2. Node h = head; // Record old head for check below
  3. setHead(node); //head指向自己
  4. //如果还有剩余量,继续唤醒下一个邻居线程
  5. if (propagate > 0 || h == null || h.waitStatus < 0 ||
  6. (h = head) == null || h.waitStatus < 0) {
  7. Node s = node.next;
  8. if (s == null || s.isShared())
  9. doReleaseShared();
  10. }
  11. }

acquireShared()小结

该方法跟acquire()的流程大同小异,只不过多了个自己拿到资源后,还会去唤醒后继队友的操作

流程:

  1. tryAcquireShared()尝试获取资源,成功则直接返回;
  2. 失败则通过doAcquireShared()进入等待队列park(),直到被unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。

releaseShared()

此方法是共享模式下线程共享资源的顶层入口。它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源,源码如下:

  1. public final boolean releaseShared(int arg) {
  2. if (tryReleaseShared(arg)) { //尝试释放资源
  3. doReleaseShared(); //唤醒后继结点
  4. return true;
  5. }
  6. return false;
  7. }

此方法的流程也比较简单,一句话:释放掉资源后,唤醒后继。跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。例如,资源总量是13,A(5)和B(7)分别获取到资源并发运行,C(4)来时只剩1个资源就需要等待。A在运行过程中释放掉2个资源量,然后tryReleaseShared(2)返回true唤醒C,C一看只有3个仍不够继续等待;随后B又释放2个,tryReleaseShared(2)返回true唤醒C,C一看有5个够自己用了,然后C就可以跟A和B一起运行。而ReentrantReadWriteLock读锁的tryReleaseShared()只有在完全释放掉资源(state=0)才返回true,所以自定义同步器可以根据需要决定tryReleaseShared()的返回值。

doReleaseShared()

此方法主要用于唤醒后继,下面是它的源码:

  1. private void doReleaseShared() {
  2. for (;;) {
  3. Node h = head;
  4. if (h != null && h != tail) {
  5. int ws = h.waitStatus;
  6. if (ws == Node.SIGNAL) {
  7. if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
  8. continue; // loop to recheck cases
  9. unparkSuccessor(h);
  10. }
  11. else if (ws == 0 &&
  12. !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
  13. continue; // loop on failed CAS
  14. }
  15. if (h == head) // loop if head changed
  16. break;
  17. }
  18. }

小结

上文主要讲述了独占和共享模式下获取-释放资源(acquire-release、acquireShared-releaseShared)的源码。值得注意的是:acquire和acquireShared方法中,线程在等待队列中都是忽略中断的。当然,AQS也支持响应中断,acquireInterruptibly()/acquireSharedInterruptibly()即可以响应中断(通过抛出异常的方式)。
参考:

  1. Java并发之AQS详解
  2. 从ReentrantLock的实现看AQS的原理及应用