概述

全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架,AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

  • 特点:
    state属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
    getState- 获取 state状态
    setState- 设置 state状态
    compareAndSetState- cas机制设置 state状态
    独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
  • 提供了基于 FIFO 的等待队列,类似于 MonitorEntryList
  • 提供了条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 MonitorWaitSet
  • 子类主要实现这样一些方法(默认抛出 UnsupportedOperationException
    • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
    • tryAcquire(int)独占方式。尝试获取资源,成功则返回true,失败则返回false。
    • tryRelease(int)独占方式。尝试释放资源,成功则返回true,失败则返回false。
    • tryAcquireShared(int)共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源
    • tryReleaseShared(int)共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

      框架结构

      它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)
      AQS原理 - 图2
      ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
        再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要小于等于线程个数最好一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
        一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryReleasetryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock

源码详解

AQS使用一个volatile的int类型的成员变量来表示同步状态,通过内置的 FIFO队列来完成资源获取的排队工作,将每个要去抢占资源的线程封装成 一个Node节点来实现锁的分配,通过CAS完成对State值的修改。

结点状态waitStatus

Node结点是对每一个等待获取资源的线程的封装,其包含了需要同步的线程本身及其等待状态,如是否被阻塞、是否等待唤醒、是否已经被取消等。变量waitStatus则表示当前Node结点的等待状态,共有5种取值CANCELLEDSIGNALCONDITIONPROPAGATE、0。

  • CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
  • SIGNAL(-1):表示后继结点在等待当前结点唤醒(**前继节点有责任换醒后继线程**)。后继结点入队时,会将前继结点的状态更新为SIGNAL。
  • CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Conditionsignal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
  • PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
  • 0:新结点入队时的默认状态。

注意:负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常。

获取独占锁

acquire(int)

该方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是lock()的语义,当然不仅仅只限于lock()。获取到资源后,线程就可以去执行其临界区代码了。

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }
  • 分析
  1. tryAcquire()尝试直接去获取资源,如果成功则直接返回
  2. addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上
  • 执行流程
  1. 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
  2. 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued()使线程在等待队列中休息,有机会时会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

image.png

tryAcquire(int)

该方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。这也正是tryLock()的语义,AbstractQueuedSynchronizer里的tryAcquire没有做方法实现,因为AQS只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现,之所以没有定义成abstract,是因为独占模式下只要实现tryAcquire-tryRelease两个方法,而共享模式下只用实现tryAcquireShared-tryReleaseShared两个方法。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口

  1. java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquire 抽象类默认
  2. protected boolean tryAcquire(int arg) {
  3. throw new UnsupportedOperationException();
  4. }
  5. java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire 子类实现
  6. protected final boolean tryAcquire(int acquires) {
  7. return nonfairTryAcquire(acquires);
  8. }
  9. java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire 子类实现
  10. protected final boolean tryAcquire(int acquires) {
  11. final Thread current = Thread.currentThread();
  12. int c = getState();
  13. if (c == 0) {
  14. if (!hasQueuedPredecessors() &&
  15. compareAndSetState(0, acquires)) {
  16. setExclusiveOwnerThread(current);
  17. return true;
  18. }
  19. }
  20. else if (current == getExclusiveOwnerThread()) {
  21. int nextc = c + acquires;
  22. if (nextc < 0)
  23. throw new Error("Maximum lock count exceeded");
  24. setState(nextc);
  25. return true;
  26. }
  27. return false;
  28. }

addWaiter(Node)

该方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点

  1. private Node addWaiter(Node mode) {
  2. //以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
  3. Node node = new Node(Thread.currentThread(), mode);
  4. //尝试快速方式直接放到队尾。
  5. Node pred = tail;
  6. if (pred != null) {
  7. node.prev = pred;
  8. if (compareAndSetTail(pred, node)) {
  9. pred.next = node;
  10. return node;
  11. }
  12. }
  13. //上一步失败则通过enq入队。
  14. enq(node);
  15. return node;
  16. }

enq(Node)

该方法用于将node加入队尾

  1. private Node enq(final Node node) {
  2. //CAS"自旋",直到成功加入队尾
  3. for (;;) {
  4. Node t = tail;
  5. if (t == null) {
  6. //初始化 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
  7. if (compareAndSetHead(new Node()))
  8. tail = head;
  9. } else {
  10. //正常流程,放入队尾
  11. node.prev = t;
  12. if (compareAndSetTail(t, node)) {
  13. t.next = node;
  14. return t;
  15. }
  16. }
  17. }
  18. }

首次进入该方法时,还没有对头和对尾的节点,会使用cas创建一个空节点
image.png
若是执行cas操作失败,则第二次循环进入else逻辑
image.png

acquireQueued(Node, int)

当执行到该方法时说明当前线程获取资源失败,已经被放入等待队列尾部了。当前线程需要进入等待状态状态,直到其他线程彻底释放资源后唤醒,简而言之就是让当前线程在等待队列中排队,直到被唤醒。

  1. 结点进入队尾后,检查状态,找到安全休息点;
  2. 调用park()进入waiting状态,等待unpark()或interrupt()唤醒自己;
  3. 被唤醒后,看自己是不是有资格能拿到号。如果拿到,head指向当前结点,并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程1。
    1. final boolean acquireQueued(final Node node, int arg) {
    2. //标记是否成功拿到资源
    3. boolean failed = true;
    4. try {
    5. //标记等待过程中是否被中断过
    6. boolean interrupted = false;
    7. //CAS自旋
    8. for (; ; ) {
    9. //拿到前驱节点
    10. final Node p = node.predecessor();
    11. //如果前驱是head,即该结点已成第二个节点,那么便有资格去尝试获取资源
    12. if (p == head && tryAcquire(arg)) {
    13. //拿到资源后,将head指向该结点。所以head所指的标杆结点,
    14. //就是当前获取到资源的那个结点或null。
    15. setHead(node);
    16. // setHead中node.prev已置为null,此处再将head.next置为null,
    17. //就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
    18. p.next = null;
    19. // 成功获取资源
    20. failed = false;
    21. //返回等待过程中是否被中断过
    22. return interrupted;
    23. }
    24. //如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。
    25. //如果不可中断的情况下被中断了,
    26. //那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
    27. if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
    28. //如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
    29. interrupted = true;
    30. }
    31. } finally {
    32. // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),
    33. //那么取消结点在队列中的等待。
    34. if (failed)
    35. cancelAcquire(node);
    36. }
    37. }

    shouldParkAfterFailedAcquire(Node, Node)

    该方法主要用于检查状态,看看自己是否真的可以进入等待了,防止队列前边的线程都放弃了。整个流程中,如果前驱结点的状态不是SIGNAL,那么当前线程就不能安心去休息,需要去找个安心的休息点,同时可以再尝试下看有没有机会轮到自己拿号。 ```java private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    1. //拿到前驱的状态
    2. int ws = pred.waitStatus;
    3. if (ws == Node.SIGNAL)
    4. //如果前驱节点状态为-1,则直接进入等待队列
    5. return true;
    6. if (ws > 0) {
    7. // 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,
    8. //并排在它的后边。
    9. do {
    10. node.prev = pred = pred.prev;
    11. } while (pred.waitStatus > 0);
    12. pred.next = node;
    13. } else {
    14. //如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。
    15. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    16. }
    17. return false;
    }
  1. <a name="4wPZS"></a>
  2. ### parkAndCheckInterrupt()
  3.   如果线程找好安全休息点后,那就可以安心去休息了。此方法就是让线程去休息,真正进入等待状态。`park()`会让当前线程进入`waiting`状态。在此状态下,有两种途径可以唤醒该线程:1)被`unpark()`;2)被`interrupt()`。需要注意的是,`Thread.interrupted()`会清除当前线程的中断标记位。
  4. ```java
  5. private final boolean parkAndCheckInterrupt() {
  6. //调用park()使线程进入waiting状态
  7. LockSupport.park(this);
  8. //如果被唤醒,查看自己是不是被中断的。
  9. return Thread.interrupted();
  10. }

释放独占锁

release(int)

该方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是unlock()的语义。它调用tryRelease()来释放资源。有一点需要注意的是,它是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了。

  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) {
  3. //找到头结点
  4. Node h = head;
  5. if (h != null && h.waitStatus != 0)
  6. //唤醒等待队列里的下一个线程(后继线程)
  7. unparkSuccessor(h);
  8. return true;
  9. }
  10. return false;
  11. }

tryRelease(int)

该方法尝试去释放指定量的资源,跟tryAcquire()一样,这个方法是需要独占模式的自定义同步器去实现的。正常来说,tryRelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题。但要注意它的返回值,release()是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了,所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false

  1. protected boolean tryRelease(int arg) {
  2. throw new UnsupportedOperationException();
  3. }
  4. //java.util.concurrent.locks.ReentrantLock.Sync#tryRelease 子类实现
  5. protected final boolean tryRelease(int releases) {
  6. int c = getState() - releases;
  7. if (Thread.currentThread() != getExclusiveOwnerThread())
  8. throw new IllegalMonitorStateException();
  9. boolean free = false;
  10. if (c == 0) {
  11. free = true;
  12. setExclusiveOwnerThread(null);
  13. }
  14. setState(c);
  15. return free;
  16. }

unparkSuccessor(Node)

该方法用于唤醒等待队列中下一个线程,简单点说就是用unpark()唤醒等待队列中最前边的那个未放弃线程
结合acquireQueued()联系起来,s被唤醒后,进入if (p == head && tryAcquire(arg))的判断,即使p!=head也没关系,它会再进入shouldParkAfterFailedAcquire()寻找一个安全点。这里既然s已经是等待队列中最前边的那个未放弃线程了,那么通过shouldParkAfterFailedAcquire()的一次或多次调整,s也必然会跑到headnext结点,下一次自旋p==head就成立了,然后s把自己设置成head结点,表示自己已经获取到资源了,acquire()也返回了

  1. private void unparkSuccessor(Node node) {
  2. //node一般为当前线程所在的结点。
  3. int ws = node.waitStatus;
  4. if (ws < 0)
  5. //置零当前线程所在的结点状态,允许失败。
  6. compareAndSetWaitStatus(node, ws, 0);
  7. //找到下一个需要唤醒的结点s
  8. Node s = node.next;
  9. if (s == null || s.waitStatus > 0) {
  10. //如果为空或已取消
  11. s = null;
  12. for (Node t = tail; t != null && t != node; t = t.prev)
  13. // 从后向前找
  14. if (t.waitStatus <= 0)
  15. s = t;
  16. }
  17. if (s != null)
  18. //唤醒
  19. LockSupport.unpark(s.thread);
  20. }

release异常

如果获取锁的线程在release时异常了,没有unpark队列中的其他结点,这时队列中的其他结没法再被唤醒

  1. public class ReleaseException {
  2. static ReentrantLock lock = new ReentrantLock();
  3. public static void main(String[] args) {
  4. ReleaseException r = new ReleaseException();
  5. r.testReleaseException();
  6. }
  7. public void testReleaseException() {
  8. for (int i = 0; i < 2; i++) {
  9. new MyThread(i).start();
  10. //确保线程0先启动
  11. Sleeper.sleep(2);
  12. }
  13. //主线程等待60s
  14. Sleeper.sleep(60);
  15. }
  16. class MyThread extends Thread {
  17. private int index;
  18. public MyThread(int index) {
  19. this.index = index;
  20. }
  21. @Override
  22. public void run() {
  23. Thread.currentThread().setName("my-" + index);
  24. log.info("{}:加锁前", index);
  25. lock.lock();
  26. try {
  27. log.info("{}:已经获取锁", index);
  28. if (index == 0) {
  29. //线程0 sleep 3秒
  30. Sleeper.sleep(3);
  31. } else {
  32. //线程1 sleep 300秒
  33. Sleeper.sleep(30);
  34. }
  35. } finally {
  36. //通过debug,让线程0在unlock()->release()->unparkSuccessor(Node node)时,强制让node=null,从而让"int ws = node.waitStatus;"抛出NPE,无法执行后续的unpark操作。
  37. lock.unlock();
  38. log.info("{}:释放锁", index);
  39. }
  40. }
  41. }
  42. }

image.png

image.png
image.png
image.png
NPE后等了很久,线程1都没有得到执行,一直在死等

获取共享锁

acquireShared(int)

该方法是共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。
这里tryAcquireShared()依然需要自定义同步器去实现。但是AQS已经把其返回值的语义定义好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。所以这里acquireShared()的流程就是:

  1. tryAcquireShared()尝试获取资源,成功则直接返回;
  2. 失败则通过doAcquireShared()进入等待队列,直到被unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。

acquire()的流程大同小异,只不过多了个自己拿到资源后,还会去唤醒后继队友的操作

  1. public final void acquireShared(int arg) {
  2. if (tryAcquireShared(arg) < 0)
  3. doAcquireShared(arg);
  4. }

doAcquireShared(int)

该方法用于将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回

  1. private void doAcquireShared(int arg) {
  2. //加入队列尾部
  3. final Node node = addWaiter(Node.SHARED);
  4. //是否成功标志
  5. boolean failed = true;
  6. try {
  7. //等待过程中是否被中断过的标志
  8. boolean interrupted = false;
  9. for (;;) {
  10. //找到前驱节点
  11. final Node p = node.predecessor();
  12. //如果前驱节点是head,此时node被唤醒,很可能是head用完资源来唤醒自己的
  13. if (p == head) {
  14. //尝试获取资源
  15. int r = tryAcquireShared(arg);
  16. if (r >= 0) {
  17. //将head指向自己,还有剩余资源可以再唤醒之后的线程
  18. setHeadAndPropagate(node, r);
  19. p.next = null;
  20. if (interrupted)
  21. //如果等待过程中被打断过,此时将中断补上。
  22. selfInterrupt();
  23. failed = false;
  24. return;
  25. }
  26. }
  27. //判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
  28. if (shouldParkAfterFailedAcquire(p, node) &&
  29. parkAndCheckInterrupt())
  30. interrupted = true;
  31. }
  32. } finally {
  33. if (failed)
  34. cancelAcquire(node);
  35. }
  36. }

setHeadAndPropagate(Node, int)

该方法在setHead()的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继结点,毕竟是共享模式

  1. private void setHeadAndPropagate(Node node, int propagate) {
  2. Node h = head;
  3. //head指向自己
  4. setHead(node);
  5. //如果后面还有节点,继续唤醒下一个邻居线程
  6. if (propagate > 0 || h == null || h.waitStatus < 0 ||
  7. (h = head) == null || h.waitStatus < 0) {
  8. Node s = node.next;
  9. if (s == null || s.isShared())
  10. doReleaseShared();
  11. }
  12. }

释放共享锁

releaseShared()

该方法是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。
该方法跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的**tryRelease()**在完全释放掉资源(**state=0**)后,才会返回**true**去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。

  • 案例

资源总量是13,A->5B->7分别获取到资源并发运行,C->4来时只剩1个资源就需要等待。A在运行过程中释放掉2个资源量,然后tryReleaseShared(2)返回true唤醒C,C一看只有3个仍不够继续等待;随后B又释放2个,tryReleaseShared(2)返回true唤醒C,C一看有5个够自己用了,然后C就可以跟A和B一起运行。而ReentrantReadWriteLock读锁的tryReleaseShared()只有在完全释放掉资源(state=0)才返回true,所以自定义同步器可以根据需要决定tryReleaseShared()的返回值。

  1. public final boolean releaseShared(int arg) {
  2. //尝试释放资源
  3. if (tryReleaseShared(arg)) {
  4. //唤醒后继结点
  5. doReleaseShared();
  6. return true;
  7. }
  8. return false;
  9. }

doReleaseShared()

该方法主要用于唤醒后继。

  1. private void doReleaseShared() {
  2. for (;;) {
  3. //自旋
  4. Node h = head;
  5. if (h != null && h != tail) {
  6. int ws = h.waitStatus;
  7. if (ws == Node.SIGNAL) {
  8. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  9. continue;
  10. //唤醒后继
  11. unparkSuccessor(h);
  12. }
  13. else if (ws == 0 &&
  14. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  15. continue;
  16. }
  17. if (h == head)
  18. // head发生变化
  19. break;
  20. }
  21. }

主要用到 AQS 的并发工具类

image.png