AQS是AbustactQueuedSynchronizer的简称,AQS的全称为(AbstractQueuedSynchronizer),这个类在 java.util.concurrent.locks包下面
它是一个Java提供的底层同步工具类,AQS是一个用来构建锁和同步器的框架,用一个int类型的变量表示同步状态,并提供了一系列的CAS操作来管理这个同步状态。使用AQS能简单且高效地构造出应用广泛的大量的同步器,
比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。
如果你搞懂AQS,那么JUC包下的绝大多数同步的类你都能掌握.
AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制
AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。
看个AQS(AbstractQueuedSynchronizer)原理图:
AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改。
private volatile int state;//共享变量,使用volatile修饰保证线程可见性
状态信息通过protected类型的getState,setState,compareAndSetState进行操作
//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
} //原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect (期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AQS,也就是队列同步器,这是实现 Lock 的基础。下图就是 AQS 的结构图,从图中可以看出,AQS 有一个 state 标记位,值为1 时表示有线程占用,其他线程需要进入到同步队列等待。同步队列是一个双向链表。
当获得锁的线程需要等待某个条件时,会进入 condition 的等待队列,等待队列可以有多个。当 condition 条件满足时,线程会从等待队列重新进入同步队列进行获取锁的竞争。ReentrantLock 就是基于 AQS 实现的,如下图所示,ReentrantLock 内部有公平锁和非公平锁两种实现,差别就在于新来的线程是否比已经在同步队列中的等待线程更早获得锁。
和 ReentrantLock 实现方式类似,Semaphore 也是基于 AQS 的,差别在于 ReentrantLock 是独占锁,Semaphore 是共享锁。
整体架构图
这个图总结了 AQS 整体架构的组成,和部分场景的动态流向,图中两个点说明一下,方便大家观看。
1. AQS 中队列只有两个:同步队列 + 条件队列,底层数据结构两者都是链表;
2. 图中有四种颜色的线代表四种不同的场景,1、2、3 序号代表看的顺序。
AQS 本身就是一套锁的框架,它定义了获得锁和释放锁的代码结构,所以如果要新建锁,只要继承 AQS,并实现相应方法即可。
AQS有两个功能:
- 独占(互斥,在同一时刻只能有一个线程获取锁.)
2. 共享(读写锁的读锁, 共享意思是允许多个线程同时获取锁)
- Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁:
- 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
- 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
- Share(共享):多个线程可同时执行,如Semaphore/CountDownLatch。Semaphore、CountDownLatch、 CyclicBarrier、ReadWriteLock 我们都会在后面讲到。
ReentrantReadWriteLock 可以看成是组合式,因为ReentrantReadWriteLock也就是读写锁允许多个线程同时对某一资源进行读。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。
AQS三大核心部分
<br />第一个是 state状态,它是一个数值,在不同的类中表示不同的含义,往往代表一种状态;<br />第二个是一个队列,该队列用来存放线程;<br />第三个是“获取/释放”的相关方法,需要利用 AQS 的工具类根据自己的逻辑去实现。
state 状态
第一个要讲解的是状态 state,如果我们的 AQS 想要去管理或者想作为协作工具类的一个基础框架,那么它必然要管理一些状态,而这个状态在 AQS 内部就是用 state 变量去表示的。它的定义如下:
* The synchronization state.
*/
private volatile int state;
而 state 的含义并不是一成不变的,它会根据具体实现类的作用不同而表示不同的含义,下面举几个例子。
比如说在信号量里面,state 表示的是剩余许可证的数量。如果我们最开始把 state 设置为 10,这就代表许可证初始一共有 10 个,然后当某一个线程取走一个许可证之后,这个 state 就会变为 9,所以信号量的 state 相当于是一个内部计数器。
再比如,在 CountDownLatch 工具类里面,state 表示的是需要“倒数”的数量。一开始我们假设把它设置为 5,当每次调用 CountDown 方法时,state 就会减 1,一直减到 0 的时候就代表这个门闩被放开。
下面我们再来看一下 state 在 ReentrantLock 中是什么含义,在 ReentrantLock 中它表示的是锁的占有情况。最开始是 0,表示没有任何线程占有锁;如果 state 变成 1,则就代表这个锁已经被某一个线程所持有了。
那为什么还会变成 2、3、4 呢?为什么会往上加呢?因为 ReentrantLock 是可重入的,同一个线程可以再次拥有这把锁就叫重入。如果这个锁被同一个线程多次获取,那么 state 就会逐渐的往上加,state 的值表示重入的次数。在释放的时候也是逐步递减,比如一开始是 4,释放一次就变成了 3,再释放一次变成了 2,这样进行的减操作,即便是减到 2 或者 1 了,都不代表这个锁是没有任何线程持有,只有当它减到 0 的时候,此时恢复到最开始的状态了,则代表现在没有任何线程持有这个锁了。所以,state 等于 0 表示锁不被任何线程所占有,代表这个锁当前是处于释放状态的,其他线程此时就可以来尝试获取了。
这就是 state 在不同类中不同含义的一个具体表现。我们举了三个例子,如果未来有新的工具要利用到 AQS,它一定也需要利用 state,为这个类表示它所需要的业务逻辑和状态。
下面我们再来看一下关于 state 修改的问题,因为 state 是会被多个线程共享的,会被并发地修改,所以所有去修改 state 的方法都必须要保证 state 是线程安全的。可是 state 本身它仅仅是被 volatile 修饰的,volatile 本身并不足以保证线程安全,所以我们就来看一下,AQS 在修改 state 的时候具体利用了什么样的设计来保证并发安全。
我们举两个和 state 相关的方法,分别是 compareAndSetState 及 setState,它们的实现已经由 AQS 去完成了,也就是说,我们直接调用这两个方法就可以对 state 进行线程安全的修改。下面就来看一下这两个方法的源码是怎么实现的。
- 先来看一下 compareAndSetState 方法,这是一个我们非常熟悉的 CAS 操作,这个方法的代码,如下所示:
protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
方法里面只有一行代码,即 return unsafe.compareAndSwapInt(this, stateOffset, expect, update),这个方法我们已经非常熟悉了,它利用了 Unsafe 里面的 CAS 操作,利用 CPU 指令的原子性保证了这个操作的原子性,与之前介绍过的原子类去保证线程安全的原理是一致的。
- 接下来看一下 setState 方法的源码,如下所示:
protected final void setState(int newState) { state = newState; }
我们可以看到,它去修改 state 值的时候非常直截了当,直接把 state = newState,这样就直接赋值了。你可能会感到困惑,这里并没有进行任何的并发安全处理,没有加锁也没有 CAS,那如何能保证线程安全呢?
这里就要说到 volatile 的作用了,前面在学习 volatile 关键字的时候,知道了它适用于两种场景,其中一种场景就是,当对基本类型的变量进行直接赋值时,如果加了 volatile 就可以保证它的线程安全。注意,这是 volatile 的非常典型的使用场景。
* The synchronization state.
*/
private volatile int state;
可以看出,state 是 int 类型的,属于基本类型,并且这里的 setState 方法内是对 state 直接赋值的,它不涉及读取之前的值,也不涉及在原来值的基础上再修改,所以我们仅仅利用 volatile 就可以保证在这种情况下的并发安全,这就是 setState 方法线程安全的原因。
下面我们对 state 进行总结,在 AQS 中有 state 这样的一个属性,是被 volatile 修饰的,会被并发修改,它代表当前工具类的某种状态,在不同的类中代表不同的含义。
FIFO 队列
下面我们再来看看 AQS 的第二个核心部分,FIFO 队列,即先进先出队列,这个队列最主要的作用是存储等待的线程。假设很多线程都想要同时抢锁,那么大部分的线程是抢不到的,那怎么去处理这些抢不到锁的线程呢?就得需要有一个队列来存放、管理它们。所以 AQS 的一大功能就是充当线程的“排队管理器”。
当多个线程去竞争同一把锁的时候,就需要用排队机制把那些没能拿到锁的线程串在一起;而当前面的线程释放锁之后,这个管理器就会挑选一个合适的线程来尝试抢刚刚释放的那把锁。所以 AQS 就一直在维护这个队列,并把等待的线程都放到队列里面。
这个队列内部是双向链表的形式,其数据结构看似简单,但是要想维护成一个线程安全的双向队列却非常复杂,因为要考虑很多的多线程并发问题。我们来看一下 AQS 作者 Doug Lea 给出的关于这个队列的一个图示:
在队列中,分别用 head 和 tail 来表示头节点和尾节点,两者在初始化的时候都指向了一个空节点。头节点可以理解为“当前持有锁的线程”,而在头节点之后的线程就被阻塞了,它们会等待被唤醒,唤醒也是由 AQS 负责操作的。
获取/释放方法
下面我们就来看一看 AQS 的第三个核心部分,获取/释放方法。在 AQS 中除了刚才讲过的 state 和队列之外,还有一部分非常重要,那就是获取和释放相关的重要方法,这些方法是协作工具类的逻辑的具体体现,需要每一个协作工具类自己去实现,所以在不同的工具类中,它们的实现和含义各不相同。
获取方法
我们首先来看一下获取方法。获取操作通常会依赖 state 变量的值,根据 state 值不同,协作工具类也会有不同的逻辑,并且在获取的时候也经常会阻塞,下面就让我们来看几个具体的例子。
比如 ReentrantLock 中的 lock 方法就是其中一个“获取方法”,执行时,如果发现 state 不等于 0 且当前线程不是持有锁的线程,那么就代表这个锁已经被其他线程所持有了。这个时候,当然就获取不到锁,于是就让该线程进入阻塞状态。
再比如,Semaphore 中的 acquire 方法就是其中一个“获取方法”,作用是获取许可证,此时能不能获取到这个许可证也取决于 state 的值。如果 state 值是正数,那么代表还有剩余的许可证,数量足够的话,就可以成功获取;但如果 state 是 0,则代表已经没有更多的空余许可证了,此时这个线程就获取不到许可证,会进入阻塞状态,所以这里同样也是和 state 的值相关的。
再举个例子,CountDownLatch 获取方法就是 await 方法(包含重载方法),作用是“等待,直到倒数结束”。执行 await 的时候会判断 state 的值,如果 state 不等于 0,线程就陷入阻塞状态,直到其他线程执行倒数方法把 state 减为 0,此时就代表现在这个门闩放开了,所以之前阻塞的线程就会被唤醒。
我们总结一下,“获取方法”在不同的类中代表不同的含义,但往往和 state 值相关,也经常会让线程进入阻塞状态,这也同样证明了 state 状态在 AQS 类中的重要地位。
释放方法
释放方法是站在获取方法的对立面的,通常和刚才的获取方法配合使用。我们刚才讲的获取方法可能会让线程阻塞,比如说获取不到锁就会让线程进入阻塞状态,但是释放方法通常是不会阻塞线程的。
比如在 Semaphore 信号量里面,释放就是 release 方法(包含重载方法),release() 方法的作用是去释放一个许可证,会让 state 加 1;而在 CountDownLatch 里面,释放就是 countDown 方法,作用是倒数一个数,让 state 减 1。所以也可以看出,在不同的实现类里面,他们对于 state 的操作是截然不同的,需要由每一个协作类根据自己的逻辑去具体实现。
AQS作用
AQS 是一个用于构建锁、同步器等线程协作工具类的框架,有了 AQS 以后,很多用于线程协作的工具类就都可以很方便的被写出来,有了 AQS 之后,可以让更上层的开发极大的减少工作量,避免重复造轮子,同时也避免了上层因处理不当而导致的线程安全问题,因为 AQS 把这些事情都做好了。总之,有了 AQS 之后,我们构建线程协作工具类就容易多了。
锁和协作类有共同点:阀门功能
就让我们从熟悉的类作为学习 AQS 的切入点,请你先来思考一下,之前学过的 ReentrantLock 和 Semaphore,二者之间有没有什么共同点?
其实它们都可以当做一个阀门来使用。比如我们把 Semaphore 的许可证数量设置为 1,那么由于它只有一个许可证,所以只能允许一个线程通过,并且当之前的线程归还许可证后,会允许其他线程继续获得许可证。其实这点和 ReentrantLock 很像,只有一个线程能获得锁,并且当这个线程释放锁之后,会允许其他的线程获得锁。那如果线程发现当前没有额外的许可证时,或者当前得不到锁,那么线程就会被阻塞,并且等到后续有许可证或者锁释放出来后,被唤醒,所以这些环节都是比较类似的。
除了上面讲的 ReentrantLock 和 Semaphore 之外,我们会发现 CountDownLatch、ReentrantReadWriteLock 等工具类都有类似的让线程“协作”的功能,其实它们背后都是利用 AQS 来实现的。
为什么需要 AQS
有了上面的铺垫,现在就让我们来想一下,为什么需要 AQS?
原因是,上面刚讲的那些协作类,它们有很多工作是类似的,所以如果能把实现类似工作的代码给提取出来,变成一个新的底层工具类(或称为框架)的话,就可以直接使用这个工具类来构建上层代码了,而这个工具类其实就是 AQS。
有了 AQS 之后,对于 ReentrantLock 和 Semaphore 等线程协作工具类而言,它们就不需要关心这么多的线程调度细节,只需要实现它们各自的设计逻辑即可。
如果没有 AQS
那我们再尝试逆向思考一下,如果没有 AQS 会怎么样?
如果没有 AQS,那就需要每个线程协作工具类自己去实现至少以下内容,包括:
- 状态的原子性管理
- 线程的阻塞与解除阻塞
- 队列的管理
这里的状态对于不同的工具类而言,代表不同的含义,比如对于 ReentrantLock 而言,它需要维护锁被重入的次数,但是保存重入次数的变量是会被多线程同时操作的,就需要进行处理,以便保证线程安全。不仅如此,对于那些未抢到锁的线程,还应该让它们陷入阻塞,并进行排队,并在合适的时机唤醒。所以说这些内容其实是比较繁琐的,而且也是比较重复的,而这些工作目前都由 AQS 来承担了。
如果没有 AQS,就需要 ReentrantLock 等类来自己实现相关的逻辑,但是让每个线程协作工具类自己去正确并且高效地实现这些内容,是相当有难度的。AQS 可以帮我们把 “脏活累活” 都搞定,所以对于 ReentrantLock 和 Semaphore 等类而言,它们只需要关注自己特有的业务逻辑即可。正所谓是“哪有什么岁月静好,不过是有人替你负重前行”。
AQS 的内部实现
AQS 队列内部维护的是一个 FIFO 的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的后继节点和直接前驱节点。所以双向链表可以从任意一个节点开始很方便的访问前驱和后继。
每个 Node 其实是由线程封装,当线程争抢锁失败后会封装成 Node 加入到 AQS队列中去;
当获取锁的线程释放锁以后,会从Node队列头部中唤醒一个阻塞的节点(线程),然后这个节点会尝试获取锁,如果获取成功这个节点就从Node队列里面删除。
Node是抢占锁失败的线程
1.原理
AQS和Condition各自维护了不同的队列,在使用lock和condition的时候,其实就是两个队列的互相移动。如果我们想自定义一个同步器,可以实现AQS。它提供了获取共享锁和互斥锁的方式,都是基于对state操作而言的。
AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了三个方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态state进行操作,当然AQS可以确保对state的操作是安全的。
Node节点的组成
释放锁以及添加线程对于队列的变化
当出现锁竞争以及释放锁的时候,AQS 同步队列中的节点会发生变化,首先看一下添加节点的场景。
里会涉及到两个变化
1. 新的线程封装成 Node 节点追加到同步队列中,设置 prev 节点以及修改当前节点的前置节点的 next 节点指向自己
2. 通过 CAS 讲 tail 重新指向新的尾部节点head 节点表示获取锁成功的节点,当头结点在释放同步状态时,会唤醒后继节点,如果后继节点获得锁成功,会把自己设置为头结点,节点的变化过程如下
这个过程也是涉及到两个变化
1. 修改 head 节点指向下一个获得锁的节点
2. 新的获得锁的节点,将 prev 的指针指向 null设置 head 节点不需要用 CAS,原因是设置 head 节点是由获得锁的线程来完成的,而同步锁只能由一个线程获得,所以不需要 CAS 保证,只需要把 head 节点设置为原首节点的后继节点,并且断开原 head 节点的 next 引用即可
哪些组件用到了AQS
CountDownLatch Semaphore CyclicBarrier ReentrantLock Condition FutureTask
- Semaphore(信号量)-允许多个线程同时访问: synchronized 和 ReentrantLock 都是一次只允许一个线程访问某个资源,Semaphore(信号量)可以指定多个线程同时访问某个资源。
- CountDownLatch (倒计时器): CountDownLatch是一个同步工具类,用来协调多个线程之间的同步。这个工具通常用来控制线程等待,它可以让某一个线程等待直到倒计时结束,再开始执行。
- CyclicBarrier(循环栅栏): CyclicBarrier 和 CountDownLatch 非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂和强大。主要应用场景和 CountDownLatch 类似。CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await()方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。
基本属性
AQS 的属性可简单分为四类:同步器简单属性、同步队列属性、条件队列属性、公用 Node。
简单属性
首先我们来看一下简单属性有哪些:
// 同步器的状态,子类会根据状态字段进行判断是否可以获得锁
// 比如 CAS 成功给 state 赋值 1 算得到锁,赋值失败为得不到锁, CAS 成功给 state 赋值 0 算释放锁
// 可重入锁,每次获得锁 +1,每次释放锁 -1
private volatile int state;
// 自旋超时阀值,单位纳秒
// 当设置等待时间时才会用到这个属性
static final long spinForTimeoutThreshold = 1000L;
最重要的就是 state 属性,是 int 属性的,所有继承 AQS 的锁都是通过这个字段来判断能不能获得锁,能不能释放锁。
同步队列属性
首先我们介绍以下同步队列:当多个线程都来请求锁时,某一时刻有且只有一个线程能够获得锁(排它锁),那么剩余获取不到锁的线程,都会到同步队列中去排队并阻塞自己,当有线程主动释放锁时,就会从同步队列头开始释放一个排队的线程,让线程重新去竞争锁。
所以同步队列的主要作用阻塞获取不到锁的线程,并在适当时机释放这些线程。
同步队列底层数据结构是个双向链表,我们从源码中可以看到链表的头尾,如下:
// 同步队列的头。
private transient volatile Node head;
// 同步队列的尾
private transient volatile Node tail;
源码中的 Node 是同步队列中的元素,但 Node 被同步队列和条件队列公用,所以我们在说完条件队列之后再说 Node。
条件队列的属性
首先我们介绍下条件队列:条件队列和同步队列的功能一样,管理获取不到锁的线程,底层数据
结构也是链表队列,但条件队列不直接和锁打交道,但常常和锁配合使用,是一定的场景下,对
锁功能的一种补充。
条件队列的属性如下:
// 条件队列,从属性上可以看出是链表结构
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
// 条件队列中第一个 node
private transient Node firstWaiter;
// 条件队列中最后一个 node
private transient Node lastWaiter;
}
ConditionObject 我们就称为条件队列,我们需要使用时,直接 new ConditionObject () 即可。
ConditionObject 是实现 Condition 接口的,Condition 接口相当于 Object 的各种监控方法,比如 Object#wait ()、Object#notify、Object#notifyAll 这些方法,我们可以先这么理解,后面会细说。
Node
Node 非常重要,即是同步队列的节点,又是条件队列的节点,在入队的时候,我们用 Node把线程包装一下,然后把 Node 放入两个队列中,我们看下 Node 的数据结构,如下:
static final class Node {
/**
* 同步队列单独的属性
*/
//node 是共享模式
static final Node SHARED = new Node();
//node 是排它模式
static final Node EXCLUSIVE = null;
// 当前节点的前节点
// 节点 acquire 成功后就会变成head
// head 节点不能被 cancelled
volatile Node prev;
// 当前节点的下一个节点
volatile Node next;
/**
* 两个队列共享的属性
*/
// 表示当前节点的状态,通过节点的状态来控制节点的行为
// 普通同步节点,就是 0 ,条件节点是 CONDITION -2
volatile int waitStatus;
// waitStatus 的状态有以下几种
// 被取消
static final int CANCELLED = 1;
// SIGNAL 状态的意义:同步队列中的节点在自旋获取锁的时候,如果前一个节点的状态是 SIGNA
static final int SIGNAL = -1;
// 表示当前 node 正在条件队列中,当有节点从同步队列转移到条件队列时,状态就会被更改成 CO
static final int CONDITION = -2;
// 无条件传播,共享模式下,该状态的进程处于可运行状态
static final int PROPAGATE = -3;
// 当前节点的线程
volatile Thread thread;
// 在同步队列中,nextWaiter 并不真的是指向其下一个节点,我们用 next 表示同步队列的下一个
// 但在条件队列中,nextWaiter 就是表示下一个节点元素
Node nextWaiter;
}
从 Node 的结构中,我们需要重点关注 waitStatus 字段,Node 的很多操作都是围绕着waitStatus 字段进行的。
Node 的 pre、next 属性是同步队列中的链表前后指向字段,nextWaiter 是条件队列中下一个节点的指向字段,但在同步队列中,nextWaiter 只是一个标识符,表示当前节点是共享还是排它模式。
Condition
刚才我们看条件队列 ConditionObject 时,发现其是实现 Condition 接口的,现在我们一起来
看下 Condition 接口,其类注释上是这么写的:
- 当 lock 代替 synchronized 来加锁时,Condition 就可以用来代替 Object 中相应的监控方
法了,比如 Object#wait ()、Object#notify、Object#notifyAll 这些方法;
2. 提供了一种线程协作方式:一个线程被暂停执行,直到被其它线程唤醒;
3. Condition 实例是绑定在锁上的,通过 Lock#newCondition 方法可以产生该实例;
4. 除了特殊说明外,任意空值作为方法的入参,都会抛出空指针;
5. Condition 提供了明确的语义和行为,这点和 Object 监控方法不同。
类注释上甚至还给我们举了一个例子:
假设我们有一个有界边界的队列,支持 put 和 take 方法,需要满足:
1:如果试图往空队列上执行 take,线程将会阻塞,直到队列中有可用的元素为止;
2:如果试图往满的队列上执行 put,线程将会阻塞,直到队列中有空闲的位置为止。
1、2 中线程阻塞都会到条件队列中去阻塞。
take 和 put 两种操作如果依靠一个条件队列,那么每次只能执行一种操作,所以我们可以新建
两个条件队列,这样就可以分别执行操作了,看了这个需求,是不是觉得很像我们第三章学习的
队列?实际上注释上给的 demo 就是我们学习过的队列,篇幅有限,感兴趣的可以看看
ConditionDemo 这个测试类。
除了类注释,Condition 还定义出一些方法,这些方法奠定了条件队列的基础,方法主要有:
void await() throws InterruptedException;
这个方法的主要作用是:使当前线程一直等待,直到被 signalled 或被打断。
当以下四种情况发生时,条件队列中的线程将被唤醒
1. 有线程使用了 signal 方法,正好唤醒了条件队列中的当前线程;
2. 有线程使用了 signalAll 方法;
3. 其它线程打断了当前线程,并且当前线程支持被打断;
4. 被虚假唤醒 (即使没有满足以上 3 个条件,wait 也是可能被偶尔唤醒,虚假唤醒定义可以参
考: https://en.wikipedia.org/wiki/Spurious_wakeup))。
被唤醒时,有一点需要注意的是:线程从条件队列中苏醒时,必须重新获得锁,才能真正被唤
醒,这个我们在说源码的时候,也会强调这个。
await 方法还有带等待超时时间的,如下:
// 返回的 long 值表示剩余的给定等待时间,如果返回的时间小于等于 0 ,说明等待时间过了
// 选择纳秒是为了避免计算剩余等待时间时的截断误差
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 虽然入参可以是任意单位的时间,但底层仍然转化成纳秒
boolean await(long time, TimeUnit unit) throws InterruptedException;
除了等待方法,还是唤醒线程的两个方法,如下:
// 唤醒条件队列中的一个线程,在被唤醒前必须先获得锁
void signal();
// 唤醒条件队列中的所有线程
void signalAll();
同步器的状态
在同步器中,我们有两个状态,一个叫做 state,一个叫做 waitStatus,两者是完全不同的概念:
1. state 是锁的状态,是 int 类型,子类继承 AQS 时,都是要根据 state 字段来判断有无得到锁,比如当前同步器状态是 0,表示可以获得锁,当前同步器状态是 1,表示锁已经被其他线程持有,当前线程无法获得锁;
2. waitStatus 是节点(Node)的状态,种类很多,一共有初始化 (0)、CANCELLED (1)、SIGNAL (-1)、CONDITION (-2)、PROPAGATE (-3),各个状态的含义可以见上文。
这两个状态我们需要牢记,不要混淆了。
获取锁
获取锁最直观的感受就是使用 Lock.lock () 方法来获得锁,最终目的是想让线程获得对资源的访
问权。
Lock 一般是 AQS 的子类,lock 方法根据情况一般会选择调用 AQS 的 acquire 或 tryAcquire
方法。
acquire 方法 AQS 已经实现了,tryAcquire 方法是等待子类去实现,acquire 方法制定了获取
锁的框架,先尝试使用 tryAcquire 方法获取锁,获取不到时,再入同步队列中等待锁。
tryAcquire 方法 AQS 中直接抛出一个异常,表明需要子类去实现,子类可以根据同步器的
state 状态来决定是否能够获得锁,接下来我们详细看下 acquire 的源码解析。
acquire 也分两种,一种是排它锁,一种是共享锁,我们一一来看下:
3.1 acquire 排它锁
// 排它模式下,尝试获得锁
public final void acquire(int arg) {
// tryAcquire 方法是需要实现类去实现的,实现思路一般都是 cas 给 state 赋值来决定是否能获得
if (!tryAcquire(arg) &&
// addWaiter 入参代表是排他模式
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
以上代码的主要步骤是(流程见整体架构图中红色场景):
1. 尝试执行一次 tryAcquire,如果成功直接返回,失败走 2;
2. 线程尝试进入同步队列,首先调用 addWaiter 方法,把当前线程放到同步队列的队尾;
3. 接着调用 acquireQueued 方法,两个作用,1:阻塞当前节点,2:节点被唤醒时,使其能
够获得锁;
4. 如果 2、3 失败了,打断线程。
3.1.1 addWaiter
// 方法主要目的:node 追加到同步队列的队尾
// 入参 mode 表示 Node 的模式(排它模式还是共享模式)
// 出参是新增的 node
// 主要思路:
// 新 node.pre = 队尾
// 队尾.next = 新 node
private Node addWaiter(Node mode) {
// 初始化 Node
Node node = new Node(Thread.currentThread(), mode);
// 这里的逻辑和 enq 一致,enq 的逻辑仅仅多了队尾是空,初始化的逻辑
// 这个思路在 java 源码中很常见,先简单的尝试放一下,成功立马返回,如果不行,再 while 循环
// 很多时候,这种算法可以帮忙解决大部分的问题,大部分的入队可能一次都能成功,无需自旋
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//自旋保证node加入到队尾
enq(node);
return node;
}
// 线程加入同步队列中方法,追加到队尾
// 这里需要重点注意的是,返回值是添加 node 的前一个节点
private Node enq(final Node node) {
for (;;) {
// 得到队尾节点
Node t = tail;
// 如果队尾为空,说明当前同步队列都没有初始化,进行初始化
// tail = head = new Node();
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
// 队尾不为空,将当前节点追加到队尾
} else {
node.prev = t;
// node 追加到队尾
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
如果之前学习过队列的同学,对这个方法应该感觉毫不吃力,就是把新的节点追加到同步队列的
队尾。
其中有一点值得我们学习的地方,是在 addWaiter 方法中,并没有进入方法后立马就自旋,而
是先尝试一次追加到队尾,如果失败才自旋,因为大部分操作可能一次就会成功,这种思路在我
们写自旋的时候可以借鉴。
3.1.2 acquireQueued
下一步就是要阻塞当前线程了,是 acquireQueued 方法来实现的,我们来看下源码实现:
// 主要做两件事情:
// 1:通过不断的自旋尝试使自己前一个节点的状态变成 signal,然后阻塞自己。
// 2:获得锁的线程执行完成之后,释放锁时,会把阻塞的 node 唤醒,node 唤醒之后再次自旋,尝试
// 返回 false 表示获得锁成功,返回 true 表示失败
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {
// 选上一个节点
final Node p = node.predecessor();
// 有两种情况会走到 p == head:
// 1:node 之前没有获得锁,进入 acquireQueued 方法时,才发现他的前置节点就是头节点,
// 2:node 之前一直在阻塞沉睡,然后被唤醒,此时唤醒 node 的节点正是其前一个节点,也能
// 如果自己 tryAcquire 成功,就立马把自己设置成 head,把上一个节点移除
// 如果 tryAcquire 失败,尝试进入同步队列
if (p == head && tryAcquire(arg)) {
// 获得锁,设置成 head 节点
setHead(node);
//p被回收
p.next = null; // help GC
failed = false;
return interrupted;
}
// shouldParkAfterFailedAcquire 把 node 的前一个节点状态置为 SIGNAL
// 只要前一个节点状态是 SIGNAL了,那么自己就可以阻塞(park)了
// parkAndCheckInterrupt 阻塞当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
// 线程是在这个方法里面阻塞的,醒来的时候仍然在无限 for 循环里面,就能再次自旋尝试
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果获得node的锁失败,将 node 从队列中移除
if (failed)
cancelAcquire(node);
}
}
此方法的注释还是很清楚的,我们接着看下此方法的核心:shouldParkAfterFailedAcquire,这个方法的主要目的就是把前一个节点的状态置为 SIGNAL,只要前一个节点的状态是SIGNAL,当前节点就可以阻塞了(parkAndCheckInterrupt 就是使节点阻塞的方法),源码
如下:
// 当前线程可以安心阻塞的标准,就是前一个节点线程状态是 SIGNAL 了。
// 入参 pred 是前一个节点,node 是当前节点。
// 关键操作:
// 1:确认前一个节点是否有效,无效的话,一直往前找到状态不是取消的节点。
// 2: 把前一个节点状态置为 SIGNAL。
// 1、2 两步操作,有可能一次就成功,有可能需要外部循环多次才能成功(外面是个无限的 for 循环
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 如果前一个节点 waitStatus 状态已经是 SIGNAL 了,直接返回,不需要在自旋了
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 如果当前节点状态已经被取消了。
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 找到前一个状态不是取消的节点,因为把当前 node 挂在有效节点身上
// 因为节点状态是取消的话,是无效的,是不能作为 node 的前置节点的,所以必须找到 node
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
// 否则直接把节点状态置 为SIGNAL
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
acquire 整个过程非常长,代码也非常多,但注释很清楚,可以一行一行仔细看看代码。
总结一下,acquire 方法大致分为三步:
1. 使用 tryAcquire 方法尝试获得锁,获得锁直接返回,获取不到锁的走 2;
2. 把当前线程组装成节点(Node),追加到同步队列的尾部(addWaiter);
3. 自旋,使同步队列中当前节点的前置节点状态为 signal 后,然后阻塞自己。
整体的代码结构比较清晰,一些需要注意的点,都用注释表明了,强烈建议阅读下源码。
3.2 acquireShared 获取共享锁
acquireShared 整体流程和 acquire 相同,代码也很相似,重复的源码就不贴了,我们就贴出来不一样的代码来,也方便大家进行比较:
1. 第一步尝试获得锁的地方,有所不同,排它锁使用的是 tryAcquire 方法,共享锁使用的是tryAcquireShared 方法,如下图:
- 第二步不同,在于节点获得排它锁时,仅仅把自己设置为同步队列的头节点即可(setHead方 法 ) , 但 如 果 是 共 享 锁 的 话 , 还 会 去 唤 醒 自 己 的 后 续 节 点 , 一 起 来 获 得 该 锁(setHeadAndPropagate 方法),不同之处如下(左边排它锁,右边共享锁):
接下来我们一起来看下 setHeadAndPropagate 方法的源码:
// 主要做两件事情
// 1:把当前节点设置成头节点
// 2:看看后续节点有无正在等待,并且也是共享模式的,有的话唤醒这些节点
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 当前节点设置成头节点
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated(表示指示) by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism(保守) in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
// propagate > 0 表示已经有节点获得共享锁了
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//共享模式,还唤醒头节点的后置节点
if (s == null || s.isShared())
doReleaseShared();
}
}
// 释放后置共享节点
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
// 还没有到队尾,此时队列中至少有两个节点
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果队列状态是 SIGNAL ,说明后续节点都需要唤醒
if (ws == Node.SIGNAL) {
// CAS 保证只有一个节点可以运行唤醒的操作
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 进行唤醒操作
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 第一种情况,头节点没有发生移动,结束。
// 第二种情况,因为此方法可以被两处调用,一次是获得锁的地方,一处是释放锁的地方,
// 加上共享锁的特性就是可以多个线程获得锁,也可以释放锁,这就导致头节点可能会发生变化
// 如果头节点发生了变化,就继续循环,一直循环到头节点不变化时,结束循环。
if (h == head) // loop if head changed
break;
}
}
这个就是共享锁独特的地方,当一个线程获得锁后,它就会去唤醒排在它后面的其它节点,让其
它节点也能够获得锁。
释放锁
释放锁的触发时机就是我们常用的 Lock.unLock () 方法,目的就是让线程释放对资源的访问权
(流程见整体架构图紫色路线)。
释放锁也是分为两类,一类是排它锁的释放,一类是共享锁的释放,我们分别来看下。
1.1 释放排它锁 release
排它锁的释放就比较简单了,从队头开始,找它的下一个节点,如果下一个节点是空的,就会从
尾开始,一直找到状态不是取消的节点,然后释放该节点,源码如下:
// unlock 的基础方法
public final boolean release(int arg) {
// tryRelease 交给实现类去实现,一般就是用当前同步器状态减去 arg,如果返回 true 说明成功释
if (tryRelease(arg)) {
Node h = head;
// 头节点不为空,并且非初始化状态
if (h != null && h.waitStatus != 0)
// 从头开始唤醒等待锁的节点
unparkSuccessor(h);
return true;
}
return false;
}
// 很有意思的方法,当线程释放锁成功后,从 node 开始唤醒同步队列中的节点
// 通过唤醒机制,保证线程不会一直在同步队列中阻塞等待
private void unparkSuccessor(Node node) {
// node 节点是当前释放锁的节点,也是同步队列的头节点
int ws = node.waitStatus;
// 如果节点已经被取消了,把节点的状态置为初始化
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 拿出 node 节点的后面一个节点
Node s = node.next;
// s 为空,表示 node 的后一个节点为空
// s.waitStatus 大于0,代表 s 节点已经被取消了
// 遇到以上这两种情况,就从队尾开始,向前遍历,找到第一个 waitStatus 字段不是被取消的
if (s == null || s.waitStatus > 0) {
s = null;
// 这里从尾迭代,而不是从头开始迭代是有原因的。
// 主要是因为节点被阻塞的时候,是在 acquireQueued 方法里面被阻塞的,唤醒时也一定会在
for (Node t = tail; t != null && t != node; t = t.prev)
// t.waitStatus <= 0 说明 t 没有被取消,肯定还在等待被唤醒
if (t.waitStatus <= 0)
s = t;
}
// 唤醒以上代码找到的线程
if (s != null)
LockSupport.unpark(s.thread);
}
1.2 释放共享锁 releaseShared
释放共享锁的方法是 releaseShared,主要分成两步:
1. tryReleaseShared 尝试释放当前共享锁,失败返回 false,成功走 2;
2. 唤醒当前节点的后续阻塞节点,这个方法我们之前看过了,线程在获得共享锁的时候,就会
去唤醒其后面的节点,方法名称为:doReleaseShared。
我们一起来看下 releaseShared 的源码:
// 共享模式下,释放当前线程的共享锁
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 这个方法就是线程在获得锁时,唤醒后续节点时调用的方法
doReleaseShared();
return true;
}
return false;
}
条件队列的重要方法
在看条件队列的方法之前,我们先得弄明白为什么有了同步队列,还需要条件队列?
主要是因为并不是所有场景一个同步队列就可以搞定的,在遇到锁 + 队列结合的场景时,就需要 Lock + Condition 配合才行,先使用 Lock 来决定哪些线程可以获得锁,哪些线程需要到同步队列里面排队阻塞;获得锁的多个线程在碰到队列满或者空的时候,可以使用 Condition 来管理这些线程,让这些线程阻塞等待,然后在合适的时机后,被正常唤醒。
同步队列 + 条件队列联手使用的场景,最多被使用到锁 + 队列的场景中。
所以说条件队列也是不可或缺的一环。
接下来我们来看一下条件队列一些比较重要的方法,以下方法都在 ConditionObject 内部类中。
2.1 入队列等待 await
获得锁的线程,如果在碰到队列满或空的时候,就会阻塞住,这个阻塞就是用条件队列实现的,
这个动作我们叫做入条件队列,方法名称为 await,流程见整体架构图中深绿色箭头流向,我们
一起来看下 await 的源码:
// 线程入条件队列
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 加入到条件队列的队尾
Node node = addConditionWaiter();
// 标记位置 A
// 加入条件队列后,会释放 lock 时申请的资源,唤醒同步队列队列头的节点
// 自己马上就要阻塞了,必须马上释放之前 lock 的资源,不然自己不被唤醒的话,别的线程永远得
int savedState = fullyRelease(node);
int interruptMode = 0;
// 确认node不在同步队列上,再阻塞,如果 node 在同步队列上,是不能够上锁的
// 目前想到的只有两种可能:
// 1:node 刚被加入到条件队列中,立马就被其他线程 signal 转移到同步队列中去了
// 2:线程之前在条件队列中沉睡,被唤醒后加入到同步队列中去
while (!isOnSyncQueue(node)) {
// this = AbstractQueuedSynchronizer$ConditionObject
// 阻塞在条件队列上
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 标记位置 B
// 其他线程通过 signal 已经把 node 从条件队列中转移到同步队列中的数据结构中去了
// 所以这里节点苏醒了,直接尝试 acquireQueued
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
// 如果状态不是CONDITION,就会自动删除
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
await 方法有几点需要特别注意:
1. 上述代码标记位置 A 处,节点在准备进入条件队列之前,一定会先释放当前持有的锁,不然自己进去条件队列了,其余的线程都无法获得锁了;
- 上述代码标记位置 B 处,此时节点是被 Condition.signal 或者 signalAll 方法唤醒的,此时节点已经成功的被转移到同步队列中去了(整体架构图中蓝色流程),所以可以直接执行acquireQueued 方法;
3. Node 在条件队列中的命名,源码喜欢用 Waiter 来命名,所以我们在条件队列中看到Waiter,其实就是 Node。
await 方法中有两个重要方法:addConditionWaiter 和 unlinkCancelledWaiters,我们一一看下。
2.1.1 addConditionWaiter
addConditionWaiter 方法主要是把节点放到条件队列中,方法源码如下:
// 增加新的 waiter 到队列中,返回新添加的 waiter
// 如果尾节点状态不是 CONDITION 状态,删除条件队列中所有状态不是 CONDITION 的节点
// 如果队列为空,新增节点作为队列头节点,否则追加到尾节点上
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 如果尾部的 waiter 不是 CONDITION 状态了,删除
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 新建条件队列 node
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 队列是空的,直接放到队列头
if (t == null)
firstWaiter = node;
// 队列不为空,直接到队列尾部
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
整 体 过 程 比 较 简 单 , 就 是 追 加 到 队 列 的 尾 部 , 其 中 有 个 重 要 方 法 叫 做
unlinkCancelledWaiters,这个方法会删除掉条件队列中状态不是 CONDITION 的所有节点,
我们来看下 unlinkCancelledWaiters 方法的源码,如下:
2.1.2 unlinkCancelledWaiters
// 会检查尾部的 waiter 是不是已经不是CONDITION状态了
// 如果不是,删除这些 waiter
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
// trail 表示上一个状态,这个字段作用非常大,可以把状态都是 CONDITION 的 node 串联起来,即
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
// 当前node的状态不是CONDITION,删除自己
if (t.waitStatus != Node.CONDITION) {
//删除当前node
t.nextWaiter = null;
// 如果 trail 是空的,咱们循环又是从头开始的,说明从头到当前节点的状态都不是 CONDIT
// 都已经被删除了,所以移动队列头节点到当前节点的下一个节点
if (trail == null)
firstWaiter = next;
// 如果找到上次状态是CONDITION的节点的话,先把当前节点删掉,然后把自己挂到上一个
else
trail.nextWaiter = next;
// 遍历结束,最后一次找到的CONDITION节点就是尾节点
if (next == null)
lastWaiter = trail;
}
// 状态是 CONDITION 的 Node
else
trail = t;
// 继续循环,循环顺序从头到尾
t = next;
}
}
为了方便大家理解这个方法,画了一个释义图,如下:
2.2 单个唤醒 signal
signal 方法是唤醒的意思,比如之前队列满了,有了一些线程因为 take 操作而被阻塞进条件队
列中,突然队列中的元素被线程 A 消费了,线程 A 就会调用 signal 方法,唤醒之前阻塞的线
程,会从条件队列的头节点开始唤醒(流程见整体架构图中蓝色部分),源码如下:
// 唤醒阻塞在条件队列中的节点
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 从头节点开始唤醒
Node first = firstWaiter;
if (first != null)
// doSignal 方法会把条件队列中的节点转移到同步队列中去
doSignal(first);
}
// 把条件队列头节点转移到同步队列去
private void doSignal(Node first) {
do {
// nextWaiter为空,说明到队尾了
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 从队列头部开始唤醒,所以直接把头节点.next 置为 null,这种操作其实就是把 node 从条件队
// 这里有个重要的点是,每次唤醒都是从队列头部开始唤醒,所以把 next 置为 null 没有关系,
first.nextWaiter = null;
// transferForSignal 方法会把节点转移到同步队列中去
// 通过 while 保证 transferForSignal 能成功
// 等待队列的 node 不用管他,在 await 的时候,会自动清除状态不是 Condition 的节点(通过
// (first = firstWaiter) != null = true 的话,表示还可以继续循环, = false 说明队列中的元素
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
我们来看下最关键的方法:transferForSignal。
// 返回 true 表示转移成功, false 失败
// 大概思路:
// 1. node 追加到同步队列的队尾
// 2. 将 node 的前一个节点状态置为 SIGNAL,成功直接返回,失败直接唤醒
// 可以看出来 node 的状态此时是 0 了
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
// 将 node 的状态从 CONDITION 修改成初始化,失败返回 false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 当前队列加入到同步队列,返回的 p 是 node 在同步队列中的前一个节点
// 看命名是 p,实际是 pre 单词的缩写
Node p = enq(node);
int ws = p.waitStatus;
// 状态修改成 SIGNAL,如果成功直接返回
// 把当前节点的前一个节点修改成 SIGNAL 的原因,是因为 SIGNAL 本身就表示当前节点后面的节
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 如果 p 节点被取消,或者状态不能修改成SIGNAL,直接唤醒
LockSupport.unpark(node.thread);
return true;
}
整个源码下来,我们可以看到,唤醒条件队列中的节点,实际上就是把条件队列中的节点转移到同步队列中,并把其前置节点状态置为 SIGNAL。
2.3 全部唤醒 signalAll
signalAll 的作用是唤醒条件队列中的全部节点,源码如下:
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 拿到头节点
Node first = firstWaiter;
if (first != null)
// 从头节点开始唤醒条件队列中所有的节点
doSignalAll(first);
}
// 把条件队列所有节点依次转移到同步队列去
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
// 拿出条件队列队列头节点的下一个节点
Node next = first.nextWaiter;
// 把头节点从条件队列中删除
first.nextWaiter = null;
// 头节点转移到同步队列中去
transferForSignal(first);
// 开始循环头节点的下一个节点
first = next;
} while (first != null);
}
从源码中可以看出,其本质就是 for 循环调用 transferForSignal 方法,将条件队列中的节点循环转移到同步队列中去。
AQS主要方法
getState():返回同步状态的当前值;
setState(int newState):设置当前同步状态;
compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性;
tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态;
tryRelease(int arg):独占式释放同步状态;
tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败;
tryReleaseShared(int arg):共享式释放同步状态;
isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占;
acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;
acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回;
tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;
acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;
acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;
tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制;
release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒;
releaseShared(int arg):共享式释放同步状态;