定义

队列同步器,构建锁和其他同步类的基础组件。它是JUC并发包下的核心基础组件。AQS解决了子类实现同步器时的大量细节问题,如获取同步状态,FIFO同步队列。AQS不仅减少了大量的实现工作,也不必处理在多个位置发生的竞争问题。在基于AQS构建的同步器中,只能在一个时刻发生阻塞,从而降低上下文切换造成的开销,提高了吞吐量,同时在设计AQS时考虑到了可伸缩性。

先上一张AQS的框架图
AQS框架图.jpeg

AQS内部方法

AQS本身实现的方法

  • acquire(int arg): 独占式的获得锁,独占式获取同步状态都调用这个方法,通过子类实现的tryAcquire方法判断是否获得到了锁。
  • acquireShared(int arg): 共享式的获得锁,例如读写锁,通过子类实现的tryAcquireShared方法判断是否获得锁
  • release(int arg): 独占式的释放锁,通过子类的tryRelease方法判断是否释放了锁
  • releaseShared(int arg): 共享式的释放锁,通过子类的tryReleaseShared方法判断是否释放了锁

    AQS子类实现的方法

  • tryAcquire(int arg):独占式的获取锁,返回值是boolean类型的,true代表获取锁,false代表获取失败。

  • tryRelease(int arg):释放独占式同步状态,释放操作会唤醒其后继节点获取同步状态。
  • tryAcquireShared(int arg):共享式的获取同步状态,返回大于0代表获取成功,否则就是获取失
  • tryReleaseShared(int arg):共享式的释放同步状态。
  • isHeldExclusively():判断当前的线程是否已经获取到了同步状态。


源码分析以及原理

AQS内部维护了一个Node节点类和一个ConditionObject类。
Node类维护了一个双向的FIFO队列,用来保存阻塞中的线程和获取同步状态的线程,ConditionObject对应的是Lock的等待通知机制。
image.png

image.png

waitstatus

Node节点类的volatile修饰的属性,代表同步节点的等待状态:

  • initial:值为0,代初始状态值,表示当前没有线程获得锁
  • cancelled:值为1 ,由于超时或者中断,节点被设置为取消状态。被设置为取消状态的节点不应该再去竞争锁,它应该一直是被取消的状态,不能变为其他的状态。处于这种状态的节点会被踢出队列,等待着被GC。
  • signal:值为-1,后继节点的线程处于等待状态,当前节点的线程释放了同步状态或者取消,会通知后继节点去获取锁
  • Condition:值为-2,节点在等待队列中,节点线程等待在Condition,当其它线程对Condition调用了signal方法,该节点会从等待队列移到同步队列中。
  • propagate:值为-3,表示下一次共享式同步状态获取将会被无条件的被传播下去(读写锁中存在的状态,代表后续还有资源,可以多个线程同时拥有同步状态)

    state

    AQS类的volatile修饰的属性,代表锁的状态

acquire(int arg):

独占式的获得锁,尝试获得锁,获取失败,加入队列

  1. public final void acquire(int arg){
  2. if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSED),arg)){
  3. //如果这个过程出现中断,那在整个过程之后再自我中断
  4. selfInterrupt();
  5. }
  6. }

addWaiter(Node mode)

往同步队列中添加元素

  1. private Node addWaiter(Node mode){
  2. Node node = new Node(Thread.currentThread(),mode);
  3. Node pred = tail;
  4. if(pred != null){
  5. node.prev = pred;
  6. if(compareAndSetTail(pred,node)){//CAS设置尾节点保证线程的安全性
  7. pred.next = node;
  8. return node;
  9. }
  10. }
  11. enq(node);//防止CAS操作失败,再次处理入队列
  12. return node;
  13. }

addWaiter()方法主要是创建一个节点,通过CAS的方法添加到队列的尾部。最后再次处理入队列是以防CAS操作失败。
image.png

enq(Node node)

防止CAS失败,重入队列

  1. private Node enq(Node node){
  2. for(;;){
  3. Node t = tail;
  4. if(t == null){
  5. if(compareAndSetHead(new Node())){//创建一个新的节点并添加到队列初始化
  6. tail - head;
  7. }
  8. }else{
  9. node.prev = t;
  10. if(compareAndSetTail(t,node)){
  11. t.next = node;
  12. return t;
  13. }
  14. }
  15. }
  16. }

通过死循环,不断的CAS设置尾节点,直到成功返回

acquireQueued(Node node,int arg)

当线程获取锁失败,并加入到同步队列中以后,就进入到一个自旋的状态,如果获取到这个状态就退出阻塞,否则一直阻塞。

  1. final boolean acquireQueued(final Node node,int arg){
  2. boolean faild = true;//是否获取了同步状态
  3. try{
  4. boolean interrupted = false;//获取状态过程中是否被中断
  5. for(;;){
  6. final Node p = node.predecessor();//前继节点
  7. if(p == head && tryAcquire(arg)){//如果前继节点是头节点并且尝试获取锁成功
  8. setHead(node);
  9. p.next = null;
  10. failed = false;
  11. return interrupted;
  12. }
  13. //判断自己是否已经阻塞了检查这个过程中是否被阻塞过
  14. if(shouldParkAfterFailedAcquire(p,node) && parkAndCheckInterrupt()){
  15. interrupted = true;
  16. }
  17. }
  18. }finally{
  19. if(failed){
  20. cacelAcquire(node);
  21. }
  22. }
  23. }

acquireQueued方法主要是让线程通过自旋去获取同步状态,因为使用的是FIFO队列,所以只有头结点的后继节点才有资格去获取同步状态,如果线程可以休息了就让该线程休息并记录过程中是否被中断过。当线程获取了同步状态就会从同步队列中移除这个节点。同时还会设置获取同步状态的这个节点(线程)为头节点。在设置头节点的时候不需要任何同步操作,因为独占锁中能获取同步状态的一定是同一线程。

shouldParkAfterFailedAcquire(Node node,Node node)

判断一个线程是否阻塞

  1. private static boolean shouldPArkAfterFailedAcquire(Node pred,Node node){
  2. int ws = pred.waitStatus;//获取节点的等待状态
  3. if(ws == Node.SIGNAL){//如果是SIGNAL就代表当头节点释放后,这个节点就会去尝试获取状态
  4. return true;//代表阻塞中
  5. }
  6. if(ws > 0){//代表前继节点放弃了
  7. do {
  8. node.prev = pred = pred.prev;//循环不停的往前找知道找到节点的状态是正常的
  9. }while(pred.waitStatus > 0 );
  10. pred.next = node;
  11. }else{
  12. compareAndSetWaitStatus(pred,ws,Node.SIGNAL);//通过CAS操作设置状态为SIGNAL
  13. }
  14. return false;
  15. }

parkAndCheckInterrupt()

前面的方法是判断是否阻塞,而这个方法就是真正的执行阻塞的方法同时返回中断状态

  1. private final boolean parkAndCheckInterupt(){
  2. LockSupport.park(this);//阻塞当前线程
  3. return Thread.interrupted();//返回中断状态
  4. }

acquire整体流程

  • 首先通过子类判断是否获取了锁,获取不到就加入到同步队列中
  • 当线程在队列中通过不停的自旋去获取同步状态。如果获取到了锁,就把其设为同步队列的头部,否则在队列中不停的自旋来获取同步状态。
  • 如果获取同步状态的过程中被中断过则自行调用interrupt方法自我中断

image.png

release(int arg)

独占式的释放锁

  1. public final boolean release(int arg){
  2. if(tryRelease(arg)){
  3. Node h = head;//获取头结点
  4. if(h != null && h.waitStatus != 0){//如果头结点不为空并且头结点的waitStatus不为0(0代表资源没有线程加锁)
  5. unparkSuccessor(h);//唤醒下一个节点
  6. }
  7. return true;
  8. }
  9. return false;
  10. }

unparkSuccessor(Node node)

唤醒后继节点获取同步状态

  1. private void unparkSuccessor(Node node){
  2. int ws = node.waitStatus;
  3. if(ws < 0){
  4. compareAndSetWaitStatus(node,ws,0);//通过CAS将头结点的状态设置为初始状态
  5. }
  6. Node s = node.next;
  7. if(s == null || s.waitSattus > 0){//不存在或者已经取消
  8. s = null;
  9. for(Node t = tail;t != null && t != node; t = t.prev){
  10. if(t.waitStatus <= 0){//从尾节点开始往前遍历,寻找离头结点最近的并且是正常状态的节点
  11. s = t;
  12. }
  13. }
  14. }
  15. if(s != null){
  16. lockSupport.unpark(s.thread);
  17. }
  18. }