AQS 自定义同步锁
AQSAbstractQueuedSynchronizer的简称。

AbstractQueuedSynchronizer 同步状态

AbstractQueuedSynchronizer 内部有一个state属性,用于指示同步的状态:

  1. private volatile int state;

state的字段是个int型的,它的值在AbstractQueuedSynchronizer中是没有具体的定义的,只有子类继承AbstractQueuedSynchronizer那么state才有意义,如在ReentrantLock中,state=0表示资源未被锁住,而state>=1的时候,表示此资源已经被另外一个线程锁住。
AbstractQueuedSynchronizer中虽然没有具体获取、修改state的值,但是它为子类提供一些操作state的模板方法:

获取状态

  1. protected final int getState() {
  2. return state;
  3. }

更新状态

  1. protected final void setState(int newState) {
  2. state = newState;
  3. }

CAS更新状态

  1. protected final boolean compareAndSetState(int expect, int update) {
  2. return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  3. }

AQS 等待队列

AQS 等待列队是一个双向队列,队列中的成员都有一个prevnext成员,分别指向它前面的节点和后面的节点。
image.png

队列节点

AbstractQueuedSynchronizer内部,等待队列节点由内部静态类Node表示:

  1. static final class Node {
  2. ...
  3. }

节点模式

队列中的节点有两种模式:

  • 独占节点:同一时刻只能有一个线程访问资源,如ReentrantLock
  • 共享节点:同一时刻允许多个线程访问资源,如Semaphore

    节点的状态

    等待队列中的节点有五种状态:

  • CANCELLED:此节点对应的线程,已经被取消

  • SIGNAL:此节点的下一个节点需要一个唤醒信号
  • CONDITION:当前节点正在条件等待
  • PROPAGATE:共享模式下会传播唤醒信号,就是说当一个线程使用共享模式访问资源时,如果成功访问到资源,就会继续唤醒等待队列中的线程。

    自定义同步锁

    为了便于理解,使用AQS自己实现一个简单的同步锁,感受一下使用AQS实现同步锁是多么的轻松。
    下面的代码自定了一个CustomLock类,继承了AbstractQueuedSynchronizer,并且还实现了Lock接口。
    CustomLock类是一个简单的可重入锁,类中只需要重写AbstractQueuedSynchronizer中的tryAcquiretryRelease方法,然后在修改少量的调用就可以实现一个最基本的同步锁。

    1. public class CustomLock extends AbstractQueuedSynchronizer implements Lock {
    2. @Override
    3. protected boolean tryAcquire(int arg) {
    4. int state = getState();
    5. if(state == 0){
    6. if( compareAndSetState(state, arg)){
    7. setExclusiveOwnerThread(Thread.currentThread());
    8. System.out.println("Thread: " + Thread.currentThread().getName() + "拿到了锁");
    9. return true;
    10. }
    11. }else if(getExclusiveOwnerThread() == Thread.currentThread()){
    12. int nextState = state + arg;
    13. setState(nextState);
    14. System.out.println("Thread: " + Thread.currentThread().getName() + "重入");
    15. return true;
    16. }
    17. return false;
    18. }
    19. @Override
    20. protected boolean tryRelease(int arg) {
    21. int state = getState() - arg;
    22. if(getExclusiveOwnerThread() != Thread.currentThread()){
    23. throw new IllegalMonitorStateException();
    24. }
    25. boolean free = false;
    26. if(state == 0){
    27. free = true;
    28. setExclusiveOwnerThread(null);
    29. System.out.println("Thread: " + Thread.currentThread().getName() + "释放了锁");
    30. }
    31. setState(state);
    32. return free;
    33. }
    34. @Override
    35. public void lock() {
    36. acquire(1);
    37. }
    38. @Override
    39. public void unlock() {
    40. release(1);
    41. }
    42. ...
    43. }

    CustomLock是实现了Lock接口,所以要重写lockunlock方法,不过方法的代码很少只需要调用AQS中的acquirerelease
    然后为了演示AQS的功能写了一个小演示程序,启动两根线程,分别命名为线程A线程B,然后同时启动,调用runInLock方法,模拟两条线程同时访问资源的场景:

    1. public class CustomLockSample {
    2. public static void main(String[] args) throws InterruptedException {
    3. Lock lock = new CustomLock();
    4. new Thread(()->runInLock(lock), "线程A").start();
    5. new Thread(()->runInLock(lock), "线程B").start();
    6. }
    7. private static void runInLock(Lock lock){
    8. try {
    9. lock.lock();
    10. System.out.println("Hello: " + Thread.currentThread().getName());
    11. Thread.sleep(2000);
    12. } catch (InterruptedException e) {
    13. e.printStackTrace();
    14. }finally {
    15. lock.unlock();
    16. }
    17. }
    18. }

    访问资源(acquire)

    在CustomLock的lock方法中,调用了acquire(1)acquire的代码如下 :

    1. public final void acquire(int arg) {
    2. if (!tryAcquire(arg) &&
    3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    4. selfInterrupt();
    5. }
  • CustomLock.tryAcquire(…)CustomLock.tryAcquire 判断当前线程是否能够访问同步资源

  • addWaiter(…):将当前线程添加到等待队列的队尾,当前节点为独占模型(Node.EXCLUSIVE)
  • acquireQueued(…):如果当前线程能够访问资源,那么就会放行,如果不能那当前线程就需要阻塞。
  • selfInterrupt:设置线程的中断标记
    :::warning 注意: 在acquire方法中,如果tryAcquire(arg)返回true, 就直接执行完了,线程被放行了。所以的后面的方法调用acquireQueued、addWaiter都是tryAcquire(arg)返回false时才会被调用。 :::

    tryAcquire 的作用

    tryAcquire在AQS类中是一个直接抛出异常的实现:

    1. protected boolean tryAcquire(int arg) {
    2. throw new UnsupportedOperationException();
    3. }

    而在我们自定义的 CustomLock 中,重写了此方法:

    1. @Override
    2. protected boolean tryAcquire(int arg) {
    3. int state = getState();
    4. if(state == 0){
    5. if( compareAndSetState(state, arg)){
    6. setExclusiveOwnerThread(Thread.currentThread());
    7. System.out.println("Thread: " + Thread.currentThread().getName() + "拿到了锁");
    8. return true;
    9. }
    10. }else if(getExclusiveOwnerThread() == Thread.currentThread()){
    11. int nextState = state + arg;
    12. setState(nextState);
    13. System.out.println("Thread: " + Thread.currentThread().getName() + "重入");
    14. return true;
    15. }
    16. return false;
    17. }

    tryAcquire方法返回一个布而值,true表示当前线程能够访问资源,false当前线程不能访问资源,所以tryAcquire的作用:决定线程是否能够访问受保护的资源tryAcquire里面的逻辑在子类可以自由发挥,AQS不关心这些,只需要知道能不能访问受保护的资源,然后来决定线程是放行还是进行等待队列(阻塞)。
    因为是在多线程环境下执行,所以不同的线程执行tryAcquire时会返回不同的值,假设线程A比线程B要快一步,先到达compareAndSetState设置state的值成员并成功,那线程A就会返回true,而 B 由于state的值不为0或者compareAndSetState执行失败,而返回false。

    线程B 抢占锁流程

    上面访问到线程A成功获得了锁,那线程B就会抢占失败,接着执行后面的方法。

    线程的入队

    线程的入队是逻辑是在addWaiter方法中,addWaiter方法的具体逻辑也不需要说太多,如果知道链表的话,就非常容易理解了,最终的结果就是将新线程添加到队尾。AQS的中有两个属性headtail分别指定等待队列的队首和队尾。

    1. private Node addWaiter(Node mode) {
    2. Node node = new Node(Thread.currentThread(), mode);
    3. // Try the fast path of enq; backup to full enq on failure
    4. Node pred = tail;
    5. if (pred != null) {
    6. node.prev = pred;
    7. if (compareAndSetTail(pred, node)) {
    8. pred.next = node;
    9. return node;
    10. }
    11. }
    12. enq(node);
    13. return node;
    14. }
    15. private Node enq(final Node node) {
    16. for (;;) {
    17. Node t = tail;
    18. if (t == null) { // Must initialize
    19. if (compareAndSetHead(new Node()))
    20. tail = head;
    21. } else {
    22. node.prev = t;
    23. if (compareAndSetTail(t, node)) {
    24. t.next = node;
    25. return t;
    26. }
    27. }
    28. }
    29. }

    需要注意的是在enq方法中,初始化队列的时候,会新建一个Node做为headtail,然后在之后的循环中将参数node添加到队尾,队列初始化完后,里面会有两个节点,一个是空的结点new Node()另外一个就是对应当前线程的结点。
    由于线程A在tryAcquire时返回了true,所以它会被直接放行,那么只有B线程会进入addWaiter方法,此时的等待队列如下:
    image.png
    注意: 等待队列内的节点都是正在等待资源的线程,如果一个线程直接能够访问资源,那它压根就不需要进入等待队列,会被放行。

    线程B 的阻塞

    线程B被添加到等待队列的尾部后,会继续执行acquireQueued方法,这个方法就是AQS阻塞线程的地方,acquireQueued方法代码的一些解释:

  • 外面是一个for (;;)无限循环,这个很重要

  • 会重新调用一次tryAcquire(arg)判断线程是否能够访问资源了
  • node.predecessor()获取参数node的前一个节点
  • shouldParkAfterFailedAcquire判断当前线程获取锁失败后,需不需要阻塞
  • parkAndCheckInterrupt()使用LockSupport阻塞当前线程,
    1. final boolean acquireQueued(final Node node, int arg) {
    2. boolean failed = true;
    3. try {
    4. boolean interrupted = false;
    5. for (;;) {
    6. final Node p = node.predecessor();
    7. if (p == head && tryAcquire(arg)) {
    8. setHead(node);
    9. p.next = null; // help GC
    10. failed = false;
    11. return interrupted;
    12. }
    13. if (shouldParkAfterFailedAcquire(p, node) &&
    14. parkAndCheckInterrupt())
    15. interrupted = true;
    16. }
    17. } finally {
    18. if (failed)
    19. cancelAcquire(node);
    20. }
    21. }
    shouldParkAfterFailedAcquire 判断是否要阻塞
    shouldParkAfterFailedAcquire接收两个参数:前一个节点、当前节点,它会判断前一个节点的waitStatus属性,如果前一个节点的waitStatus=Node.SIGNAL就会返回true:
    1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    2. int ws = pred.waitStatus;
    3. if (ws == Node.SIGNAL)
    4. return true;
    5. if (ws > 0) {
    6. do {
    7. node.prev = pred = pred.prev;
    8. } while (pred.waitStatus > 0);
    9. pred.next = node;
    10. } else {
    11. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    12. }
    13. return false;
    14. }
    acquireQueued方法在循环中会多次调用shouldParkAfterFailedAcquire,在等待队列中节点的waitStatus的属性默认为0,所以第一次执行shouldParkAfterFailedAcquire会执行:
    1. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    更新完pred.waitStatus后,节点的状态如下:
    image.png
    然后shouldParkAfterFailedAcquire返回false,回到acquireQueued的循环体中,又去抢锁还是失败了,又会执行shouldParkAfterFailedAcquire,第二次循环时此时的pred.waitStatus等于Node.SIGNAL那么就会返回true。
    parkAndCheckInterrupt 阻塞线程
    这个方法就比较直观了, 就是将线程的阻塞住:
    1. private final boolean parkAndCheckInterrupt() {
    2. LockSupport.park(this);
    3. return Thread.interrupted();
    4. }
    为什么是一个for (;;)无限循环呢
    先看一个for (;;)的退出条件,只有node的前一个节点是head并且tryAcquire返回true时才会退出循环,否则的话线程就会被parkAndCheckInterrupt阻塞。
    线程被parkAndCheckInterrupt阻塞后就不会向下面执行了,但是等到它被唤醒后,它还在for (;;)体中,然后又会继续先去抢占锁,然后如果还是失败,那又会处于等待状态,所以一直循环下去,就只有两个结果:
  1. 抢到锁退出循环
  2. 抢占锁失败,等待下一次唤醒再次抢占锁

    线程 A 释放锁

    线程A的业务代码执行完成后,会调用CustomLock.unlock方法,释放锁。unlock方法内部调用的release(1)

    1. public void unlock() {
    2. release(1);
    3. }

    release是AQS类的方法,它跟acquire相反是释放的意思:

    1. public final boolean release(int arg) {
    2. if (tryRelease(arg)) {
    3. Node h = head;
    4. if (h != null && h.waitStatus != 0)
    5. unparkSuccessor(h);
    6. return true;
    7. }
    8. return false;
    9. }

    方法体中的tryRelease是不是有点眼熟,没错,它也是在实现CustomLock类时重写的方法,首先在tryRelease中会判断当前线程是不是已经获得了锁,如果没有就直接抛出异常,否则的话计算state的值,如果state为0的话就可以释放锁了。

    1. protected boolean tryRelease(int arg) {
    2. int state = getState() - arg;
    3. if(getExclusiveOwnerThread() != Thread.currentThread()){
    4. throw new IllegalMonitorStateException();
    5. }
    6. boolean free = false;
    7. if(state == 0){
    8. free = true;
    9. setExclusiveOwnerThread(null);
    10. System.out.println("Thread: " + Thread.currentThread().getName() + "释放了锁");
    11. }
    12. setState(state);
    13. return free;
    14. }

    release方法只做了两件事:

  3. 调用tryRelease判断当前线程释放锁是否成功

  4. 如果当前线程锁释放锁成功,唤醒其他线程(也就是正在等待中的B线程)

tryRelease返回true后,会执行if里面的代码块:

  1. if (tryRelease(arg)) {
  2. Node h = head;
  3. if (h != null && h.waitStatus != 0)
  4. unparkSuccessor(h);
  5. return true;
  6. }

先回顾一下现在的等待队列的样子:
image.png
根据上面的图,来走下流程:

  • 首先拿到head属性的对象,也就是队列的第一个对象
  • 判断head不等于空,并且waitStatus!=0,很明显现在的waitStatus是等于Node. SIGNAL的,它的值是-1

所以if (h != null && h.waitStatus != 0)这个if肯定是满足条件的,接着执行unparkSuccessor(h)

  1. private void unparkSuccessor(Node node) {
  2. int ws = node.waitStatus;
  3. if (ws < 0)
  4. compareAndSetWaitStatus(node, ws, 0);
  5. Node s = node.next;
  6. ...
  7. if (s != null)
  8. LockSupport.unpark(s.thread);
  9. }

unparkSuccessor首先将node.waitStatus设置为0,然后获取node的下一个节点,最后调用LockSupport.unpark(s.thread)唤醒线程,至此我们的B线程就被唤醒了。
此时的队列又回到了,线程B刚刚入队的样子:
image.png

线程B 唤醒之后

线程A释放锁后,会唤醒线程B,回到线程B的阻塞点,acquireQueued的for循环中:

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. for (;;) {
  6. final Node p = node.predecessor();
  7. if (p == head && tryAcquire(arg)) {
  8. setHead(node);
  9. p.next = null; // help GC
  10. failed = false;
  11. return interrupted;
  12. }
  13. if (shouldParkAfterFailedAcquire(p, node) &&
  14. parkAndCheckInterrupt())
  15. interrupted = true;
  16. }
  17. } finally {
  18. if (failed)
  19. cancelAcquire(node);
  20. }
  21. }

线程唤醒后的第一件事就是,拿到它的上一个节点(当前是head结点),然后使用if判断

  1. if (p == head && tryAcquire(arg))

根据现在等待队列中的节点状态,p == head是返回true的,然后就是tryAcquire(arg)了,由于线程A已经释放了锁,那现在的线程B自然就能获取到锁了,所以tryAcquire(arg)也会返回true。

设置队列头

线路B拿到锁后,会调用setHead(node)自己设置为队列的头:

  1. private void setHead(Node node) {
  2. head = node;
  3. node.thread = null;
  4. node.prev = null;
  5. }

调用setHead(node)后队列会发生些变化 :
image.png

移除上一个节点

setHead(node)执行完后,接着按上一个节点完全移除:

  1. p.next = null;

此时的队列:
image.png

线程B 释放锁

线程B 释放锁的流程与线程A基本一致,只是当前队列中已经没有需要唤醒的线程,所以不需要执行代码去唤醒其他线程:

  1. if (tryRelease(arg)) {
  2. Node h = head;
  3. if (h != null && h.waitStatus != 0)s
  4. unparkSuccessor(h);
  5. return true;
  6. }

h != null && h.waitStatus != 0这里的h.waitStatus已经是0了,不满足条件,不会去唤醒其他线程。

总结

通过自定义一个CustomLock类,然后通过查看AQS源码来学习AQS的部分原理。通过完整的走完锁的获取、释放两个流程,加深对AQS的理解。