一、概念

AbstractQueuedSynchronizer 队列同步器简称 AQS,它是实现同步器的基础组件,juc 下面 Lock 的实现以及一些并发工具类就是通过 AQS 来实现的。

二、基本思想

  • AQS 是一个通过内置的 FIFO 双向队列来完成线程的排队工作(内部通过结点 head 和 tail 记录队首和队尾元素,元素的结点类型为 Node 类型)
  1. /* 等待队列的队首结点(懒加载,这里体现为竞争失败的情况下,加入同步队列的线程执行到enq方法的时候会创
  2. 建一个Head结点)。该结点只能被setHead方法修改。并且结点的waitStatus不能为CANCELLED */
  3. private transient volatile Node head;
  4. /* 等待队列的尾节点,也是懒加载的。(enq方法)只在加入新的阻塞结点的情况下修改 */
  5. private transient volatile Node tail;
  • 其中 Node 中的 thread 用来存放进入 AQS 队列中的线程引用,Node 结点内部的 SHARED 表示标记线程是因为获取共享资源失败被阻塞添加到队列中的;Node 中的 EXCLUSIVE 表示线程因为获取独占资源失败被阻塞添加到队列中的。waitStatus 表示当前线程的等待状态:
  1. /** 指示节点在共享模式下等待的标记 */
  2. static final Node SHARED = new Node();
  3. /** 指示节点正在排他模式中等待的标记 */
  4. static final Node EXCLUSIVE = null;
  5. /** 表示线程因为中断或者等待超时,需要从等待队列中退出,不可以在回到队列,等待 GC 回收 */
  6. static final int CANCELLED = 1;
  7. /** 等待状态值,指示后续线程被阻塞 */
  8. static final int SIGNAL = -1;
  9. /** 表示线程处于等待状态,结点在等待队列中,条件阻塞 */
  10. static final int CONDITION = -2;
  11. /** 表示下一次共享状态获取将会传递给后继结点获取这个共享同步状态,指头节点的状态 */
  12. static final int PROPAGATE = -3;
  • AQS 中维持了一个单一的 volatile 修饰的状态信息 state(AQS 通过 Unsafe 的相关方法,以原子性的方式由线程去获取这个state)AQS 提供了 getState()、setState()、compareAndSetState() 函数修改值(实际上调用的是unsafe的compareAndSwapInt方法)
  1. // 这就是我们刚刚说到的head结点,懒加载的(只有竞争失败需要构建同步队列的时候,才会创建这个head),
  2. // 如果头节点存在,它的waitStatus不能为CANCELLED
  3. private transient volatile Node head;
  4. // 当前同步队列尾节点的引用,也是懒加载的,只有调用enq方法的时候会添加一个新的wait node
  5. private transient volatile Node tail;
  6. // AQS核心:同步状态
  7. private volatile int state;
  8. protected final int getState() {
  9. return state;
  10. }
  11. protected final void setState(int newState) {
  12. state = newState;
  13. }
  14. protected final boolean compareAndSetState(int expect, int update) {
  15. return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  16. }
  • AQS 的设计师基于模板方法模式的。使用时候需要继承同步器并重写指定的方法,并且通常将子类推荐为定义同步组件的静态内部类,子类重写这些方法之后,AQS 工作时使用的是提供的模板方法,在这些模板方法中调用子类重写的方法。
  1. // 独占式的获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态
  2. protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}
  3. // 独占式的释放同步状态,等待获取同步状态的线程可以有机会获取同步状态
  4. protected boolean tryRelease(int arg) { throw new UnsupportedOperationException();}
  5. // 共享式的获取同步状态
  6. protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException();}
  7. // 尝试将状态设置为以共享模式释放同步状态。 该方法总是由执行释放的线程调用。
  8. protected int tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
  9. // 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占
  10. protected int isHeldExclusively(int arg) { throw new UnsupportedOperationException();}
  • AQS 的内部类 ConditionObject 是通过结合锁实现线程同步,ConditionObjec t可以直接访问 AQS 的变量(state、queue),ConditionObject 是个条件变量 ,每个 ConditionObject 对应一个队列用来存放线程调用condition 条件变量的 await 方法之后被阻塞的线程

三、CLH 同步队列

AQS 会把所有的请求线程构成一个 CLH 队列,当一个线程执行完毕时会激活自己的后继节点,但正在执行的线程并不在队列中,而那些等待执行的线程全部处于阻塞状态

image.png

四、核心方法

AQS 子类定义为非公共内部帮助器类(私有的内部类,并继承 AQS),提供获取锁和释放锁的功能模板

  • void acquire(int arg) 以独占模式获取对象,忽略中断。

  • void acquireShared(int arg) 以共享模式获取对象,忽略中断。

  • boolean tryAcquire(int arg) 试图在独占模式下获取对象状态。

  • boolean tryAcquireShared(int arg) 试图在共享模式下获取对象状态

  • boolean release(int arg) 以独占模式释放对象。

  • boolean releaseShared(int arg) 以共享模式释放对象。

五、利用 AQS 实现重入锁

继承 Lock 接口,然后利用 AQS 模板方法实现 tryAcquire 和 tryRelease 方法即可

自定义重入锁的实现

  1. public class MyLock implements Lock {
  2. // 实例化帮助其
  3. private Helper helper = new Helper();
  4. // 继承了 AQS 模板
  5. private static class Helper extends AbstractQueuedSynchronizer {
  6. // 获取锁
  7. @Override
  8. protected boolean tryAcquire(int arg) {
  9. int state = getState();
  10. if (state == 0) {
  11. // 利用 CAS 原理修改 state
  12. if (compareAndSetState(0, arg)) {
  13. // 设置当前线程占有资源(独占)
  14. setExclusiveOwnerThread(Thread.currentThread());
  15. return true;
  16. }
  17. }
  18. // 添加可重入性判断,判断当前线程拥有锁的是不是自己
  19. else if (getExclusiveOwnerThread() == Thread.currentThread()) {
  20. setState(getState() + arg);
  21. return true;
  22. }
  23. return false;
  24. }
  25. // 释放锁
  26. @Override
  27. protected boolean tryRelease(int arg) {
  28. int state = getState() - arg;
  29. // 判断释放后是否为 0
  30. if (state == 0) {
  31. // setExclusiveOwnerThread(null);
  32. setState(state);
  33. return true;
  34. }
  35. // 不存在线程安全问题,因为已经有线程独占了资源
  36. setState(state); // 重入性问题
  37. return false;
  38. }
  39. public Condition newConditionObject() {
  40. return new ConditionObject();
  41. }
  42. }
  43. @Override
  44. public void lock() {
  45. helper.acquire(1);
  46. }
  47. @Override
  48. public void lockInterruptibly() throws InterruptedException {
  49. helper.acquireInterruptibly(1);
  50. }
  51. @Override
  52. public boolean tryLock() {
  53. return helper.tryAcquire(1);
  54. }
  55. // 指定时间内获取不到锁,就抛出中断异常
  56. @Override
  57. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  58. return helper.tryAcquireNanos(1, unit.toNanos(time));
  59. }
  60. @Override
  61. public void unlock() {
  62. helper.release(1);
  63. }
  64. @Override
  65. public Condition newCondition() {
  66. return helper.newConditionObject();
  67. }
  68. }

锁的应用

解决多线程并发问题

保证变量的原子性操作

  1. public class MyLockDemo01 {
  2. private MyLock lock = new MyLock();
  3. private int m = 0;
  4. public int increase() {
  5. lock.lock();
  6. try{
  7. return m++;
  8. } finally {
  9. lock.unlock();
  10. }
  11. }
  12. public static void main(String[] args) {
  13. MyLockDemo01 demo = new MyLockDemo01();
  14. Thread[] threads = new Thread[20];
  15. for (int i = 0; i < 20; i++) {
  16. threads[i] = new Thread(() -> System.out.println(demo.increase()));
  17. threads[i].start();
  18. }
  19. }
  20. }

解决线程重入问题

这也是 ReentrantLock 的实现方式

  1. public class MyLockDemo02 {
  2. private MyLock lock = new MyLock();
  3. public void a() {
  4. lock.lock();
  5. System.out.println("a is running");
  6. b();
  7. lock.unlock();
  8. }
  9. private void b() {
  10. lock.lock(); // 可重入性问题
  11. System.out.println("b is running");
  12. lock.unlock();
  13. }
  14. public static void main(String[] args) {
  15. MyLockDemo02 demo = new MyLockDemo02();
  16. new Thread(demo::a).start();
  17. }
  18. }