1. 谈谈你对 AQS 的理解?
AQS 的全称是 AbstractQueuedSynchronizer,这个类在 java.util.concurrent.locks 包下。AQS 是一个用来构建锁和同步器的框架,使用 AQS 能简单且高效地构造出应用广泛的大量同步器。如 ReentrantLock、Semaphore、ReentrantReadWriteLock、SynchronousQueue、FutureTask 等等皆是基于 AQS 的。当然,我们自己也能利用 AQS 非常轻松容易地构造出符合我们自己需求的同步器。
AQS 底层使用了模板方法模式,自定义同步器一般的方式是这样:
- 使用者继承 AQS 并重写指定的方法(比较简单,只是对于共享资源 state 的获取和释放)。
- 将 AQS 组合在自定义同步器的实现中,并调用其模版方法,而这些模板方法会调用使用者重写的方法。
一般来说,基于 AQS 实现的同步器都会使用 组合优先于继承 的原则,在内部声明一个内部私有的继承于 AQS 的子类 Sync,对这个同步器的所有公有方法的调用都会委托给这个内部子类。AQS 是实现锁的关键,锁是面向使用者的,而同步器是面向锁的实现者的。
2. AQS 的接口与示例
重写同步器指定的方法时,需要使用同步器提供的如下 3 个方法来访问或修改同步状态:
- getState():获取当前同步状态。
- setState():设置当前同步状态。
- compareAndSetState(int expect, int update):使用 CAS 设置当前状态,该方法能够保证状态设置的原子性。
同步器可重写的方法有:tryAcquire(),tryRelease(),tryAcquireShared(),tryReleaseShared(),isHeldExclusive()。
同步器提供的模版方法基本上分为 3 类:
- 独占式获取与释放同步状态。
- 共享式获取与释放同步状态。
- 查询同步队列中的等待线程情况。
3. AQS 的实现分析
同步器完成线程同步主要包括:同步队列、独占式同步状态获取与释放、共享式同步状态获取与释放、超时获取同步状态。3.1 同步队列
同步器依赖于内部的同步队列(一个 FIFO 双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程。首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。
在同步器中,我们有两个状态,一个叫做 state,一个叫做 waitStatus,两者是完全不同的概念:
- state 是锁的状态,是 int 类型,子类继承 AQS 时,都是要根据 state 字段来判断有无得到锁,比如当前同步器状态是 0,表示可以获得锁,当前同步器状态是 1,表示锁已经被其他线程持有,当前线程无法获得锁;
waitStatus 是节点(Node)的状态,种类很多,一共有初始化 (0)、CANCELLED (1)、SIGNAL (-1)、CONDITION (-2)、PROPAGATE (-3),各个状态的含义可以见上文。
3.2 独占式同步状态获取
AQS 通过 acquire() 方法调用 tryAcquire() 方法获取锁,acquire() 方法 AQS 已经实现了,tryAcquire() 方法需要 AQS 的子类去实现。子类可以通过 state 状态来决定是否能够获取锁。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
以上代码的主要步骤是:
尝试执行一次 tryAcquire(),如果成功直接返回。
- 如果
tryAcquire(arg)
执行失败,代表获取锁失败,则执行acquireQueued()
方法,将线程加入 FIFO 等待队列。acquireQueued()
首先会调用addWaiter(Node.EXCLUSIVE)
将包含有当前线程的 Node 节点入队,Node.EXCLUSIVE 代表此结点为独占模式。 - 线程尝试进入同步队列,调用 addWaiter() 方法,把当前线程组装成节点(Node)并添加到同步队列的队尾。
- 接着调用 acquireQueued() 方法,通过不断自旋获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。
3.2.1 acquireQueued() 剖析
先来看看addWaiter()
是怎么实现的:
这段逻辑比较清楚,首先是获取 FIFO 队列的尾结点,如果尾结点存在,则采用 CAS 的方式将等待线程入队,如果尾结点为空则执行private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 如果尾结点不为空,则用 CAS 将获取锁失败的线程入队
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果结点为空,执行 enq 方法
enq(node);
return node;
}
enq()
方法。
在addWaiter()
方法中,并没有进入方法后立马就自旋,而是先尝试一次追加到队尾,如果失败才自旋,因为大部分操作可能一次就可以成功。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 尾结点为空,说明 FIFO 队列未初始化,所以先初始化其头结点
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 尾结点不为空,则将等待线程入队
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
}
首先判断 tail 是否为空,如果为空说明 FIFO 队列的 head,tail 还未构建,此时先构建头结点,构建之后再用 CAS 的方式将此线程结点入队。
acquireQueued()
中体现了 AQS 对线程自旋的处理:如果当前节点的上一个节点不为 head,且它的状态为 SIGNAL,则节点进入阻塞状态。来看具体的代码:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 如果前一个节点是 head,则尝试自旋获取锁
if (p == head && tryAcquire(arg)) {
// 将 head 结点指向当前节点,原 head 结点出队
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果前一个节点不是 head 或者竞争锁失败,则进入阻塞状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 如果线程自旋中因为异常等原因获取锁最终失败,则调用此方法
cancelAcquire(node);
}
}
3.3 独占式同步状态释放
排它锁的释放就比较简单了,从队头开始,找它的下一个节点,如果下一个节点是空的,就会从尾开始,一直找到状态不是取消的节点,然后释放该节点。不管是公平锁还是非公平锁,最终都是调的 AQS 的如下模板方法来释放锁:
public final boolean release(int arg) {
// tryRelease 交给实现类去实现,一般就是用当前同步器状态减去 arg,如果返回 true 说明成功释放锁。
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 从头开始唤醒等待锁的节点
unparkSuccessor(h);
return true;
}
return false;
}
4. CountDownLatch 和 CyclicBarrier 的区别
CountDownLatch 和 CyclicBarrier 都能够实现线程之间的等待,只不过它们侧重点不同:
- CountDownLatch 一般用于某个线程 A 等待若干个其他线程执行完任务之后,它才执行。
- CyclicBarrier 一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。
另外,CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。
5. 谈谈对 CopyOnWriteArrayList 的理解?
在很多应用场景中,读操作的频率可能会远远大于写操作。由于读操作根本不会修改原有的数据,因此对于每次读取都进行加锁是一种资源浪费。CopyOnWriteArrayList 在对一块内存进行修改时,不在原有内存块中进行操作,而是将内存拷贝一份,在新的内存中进行写操作。写完之后,将原来内存指针指向新的内存,原来的内存就可以被回收了。CopyOnWriteArrayList 读取操作没有任何同步控制,理由就是内部数组 array 不会发生修改,只会被另一个 array 替换,这样就保证了数据安全。
6. ReentrantLock 锁
6.1 类注释
来看看 AQS 的重要实现类:ReentrantLock,看看其底层是如何组合 AQS,实现了自己的那些功能。ReentrantLock 中文我们习惯叫做可重入互斥锁,可重入的意思是同一个线程可以对同一个共享资源重复的加锁或释放锁,互斥就是 AQS 中的排它锁的意思,只允许一个线程获得锁。
我们来一起来看下类注释上都有哪些重要信息:
- 可重入互斥锁,和 synchronized 锁具有同样的功能语义,但更有扩展性;
- 构造器接受 fair 的参数,fair 是 ture 时,保证获得锁时的顺序,false 不保证;
- 公平锁的吞吐量较低,获得锁的公平性不能代表线程调度的公平性;
- tryLock() 无参方法没有遵循公平性,是非公平的(lock 和 unlock 都有公平和非公平,而 tryLock 只有公平锁,所以单独拿出来说一说)。
6.2 类结构
ReentrantLock 类本身是不继承 AQS 的,实现了 Lock 接口,如下:
Lock 接口定义了各种加锁,释放锁的方法,接口有如下几个:public class ReentrantLock implements Lock, java.io.Serializable {}
ReentrantLock 就负责实现这些接口,我们使用时,直接面对的也是这些方法,这些方法的底层实现都是交给 Sync 内部类去实现的,Sync 类的定义如下:// 获得锁方法,获取不到锁的线程会到同步队列中阻塞排队 void lock(); // 获取可中断的锁 void lockInterruptibly() throws InterruptedException; // 尝试获得锁,如果锁空闲,立马返回 true,否则返回 false boolean tryLock(); // 带有超时等待时间的锁,如果超时时间到了,仍然没有获得锁,返回 false boolean tryLock(long time, TimeUnit unit) throws InterruptedException; // 释放锁 void unlock(); // 得到新的 Condition Condition newCondition();
Sync 继承了 AbstractQueuedSynchronizer ,所以 Sync 就具有了锁的框架,根据 AQS 的框架,Sync 只需要实现 AQS 预留的几个方法即可,但 Sync 也只是实现了部分方法,还有一些交给子类 NonfairSync 和 FairSync 去实现了,NonfairSync 是非公平锁,FairSync 是公平锁,定义如下:abstract static class Sync extends AbstractQueuedSynchronizer {}
几个类整体的结构如下:// 同步器 Sync 的两个子类锁 static final class FairSync extends Sync {} static final class NonfairSync extends Sync {}
图中 Sync、NonfairSync、FairSync 都是静态内部类的方式实现的,这个也符合 AQS 框架定义的实现标准。6.3 构造器
ReentrantLock 构造器有两种,代码如下: ```java // 无参数构造器,相当于 ReentrantLock(false),默认是非公平的 public ReentrantLock() { sync = new NonfairSync(); }
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
无参构造器默认构造是非公平的锁,有参构造器可以选择。从构造器中可以看出,公平锁是依靠 FairSync 实现的,非公平锁是依靠 NonfairSync 实现的。
<a name="126c2767"></a>
## 6.4 Sync 同步器
Sync 表示同步器,继承了 AQS,UML 图如下:<br /><br />从 UML 图中可以看出,lock 方法是个抽象方法,留给 FairSync 和 NonfairSync 两个子类去实现,我们一起来看下剩余重要的几个方法。
<a name="aca4c4a1"></a>
### 6.4.1 nonfairTryAcquire
```java
// 尝试获得非公平锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 如果同步器的状态是 0,表示此时资源是空闲的(即锁是释放的),再用CAS获取锁
if (c == 0) {
// 让当前线程用CAS获取锁
if (compareAndSetState(0, acquires)) {
// 标记当前持有锁的线程是谁
setExclusiveOwnerThread(current);
return true;
}
}
// 如果当前线程已经持有锁了,同一个线程可以对同一个资源重复加锁,代码实现的是可重入锁
else if (current == getExclusiveOwnerThread()) {
// 当前线程持有锁的数量 + acquires
int nextc = c + acquires;
// int 是有最大值的,<0 表示持有锁的数量超过了 int 的最大值
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//否则线程进入同步队列
return false;
}
此段代码可知锁的获取主要分两种情况:
- state 为 0 时,代表锁已经被释放,可以去获取,于是使用 CAS 去重新获取锁资源,如果获取成功,则代表竞争锁成功,使用 setExclusiveOwnerThread(current) 记录下此时占有锁的线程,看到这里的 CAS,大家应该不难理解为啥当前实现是非公平锁了,因为队列中的线程与新线程都可以 CAS 获取锁啊,新来的线程不需要排队
- 如果 state 不为 0,代表之前已有线程占有了锁,如果此时的线程依然是之前占有锁的线程(current == getExclusiveOwnerThread() 为 true),代表此线程再一次占有了锁(可重入锁),此时更新 state,记录下锁被占有的次数(锁的重入次数),这里的 setState 方法不需要使用 CAS 更新,因为此时的锁就是当前线程占有的,其他线程没有机会进入这段代码执行。所以此时更新 state 是线程安全的。
6.4.2 tryRelease
// 释放锁方法,非公平和公平锁都使用 protected final boolean tryRelease(int releases) { // 当前同步器的状态减去释放的个数,releases 一般为 1 int c = getState() - releases; // 当前线程根本都不持有锁,报错 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 如果 c 为 0,表示当前线程持有的锁都释放了 if (c == 0) { free = true; setExclusiveOwnerThread(null); } // 如果 c 不为 0,那么就是可重入锁,并且锁没有释放完,用 state 减去 releases 即可,无需做其他操作 setState(c); return free; }
tryRelease 方法是公平锁和非公平锁都公用的,在锁释放的时候,是没有公平和非公平的说法的。
从代码中可以看到,锁最终被释放的标椎是 state 的状态为 0,在重入加锁的情况下,需要重入解锁相应的次数后,才能最终把锁释放,比如线程 A 对共享资源 B 重入加锁 5 次,那么释放锁的话,也需要释放 5 次之后,才算真正的释放该共享资源了。
6.5 FairSync 公平锁
FairSync 公平锁只实现了 lock 和 tryAcquire 两个方法,lock 方法非常简单,如下:
// acquire 是 AQS 的方法,表示先尝试获得锁,失败之后进入同步队列阻塞等待
final void lock() {
acquire(1);
}
tryAcquire 方法是 AQS 在 acquire 方法中留给子类实现的抽象方法,FairSync 中实现的源码如下:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// hasQueuedPredecessors 是实现公平的关键
// 会判断当前线程是不是属于同步队列的头节点的下一个节点(头节点是释放锁的节点)
// 如果是(返回false),符合先进先出的原则,可以获得锁
// 如果不是(返回true),则继续等待
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 可重入锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
代码和 Sync 的 nonfairTryAcquire 方法实现类似,唯一不同的是在获得锁时使用 hasQueuedPredecessors 方法体现了其公平性。
6.6 NonfairSync 非公平锁
NonfairSync 底层实现了 lock 和 tryAcquire 两个方法,如下:
// 加锁
final void lock() {
// cas 给 state 赋值
if (compareAndSetState(0, 1))
// cas 赋值成功,代表拿到当前锁,记录拿到锁的线程
setExclusiveOwnerThread(Thread.currentThread());
else
// acquire 是抽象类AQS的方法,
// 会再次尝试获得锁,失败会进入到同步队列中
acquire(1);
}
// 直接使用的是 Sync.nonfairTryAcquire 方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
6.7 如何串起来
以上内容主要说了 ReentrantLock 的基本结构,比较零散,那么这些零散的结构如何串联起来呢?我们是通过 lock、tryLock、unlock 这三个 API 将以上几个类串联起来,我们来一一看下。
6.7.1 lock 加锁
lock 的代码实现:
public void lock() {
sync.lock();
}
其底层的调用关系(只是简单表明调用关系,并不是完整分支图)如下:
6.7.2 tryLock 尝试加锁
tryLock 有两个方法,一种是无参的,一种提供了超时时间的入参,两种内部是不同的实现机制,代码分别如下:
// 无参构造器
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
// timeout 为超时的时间,在时间内,仍没有得到锁,会返回 false
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
6.7.3 unlock 释放锁
unlock 释放锁的方法,底层调用的是 Sync 同步器的 release 方法,release 是 AQS 的方法,分成两步:
- 尝试释放锁,如果释放失败,直接返回 false;
- 释放成功,从同步队列的头节点的下一个节点开始唤醒,让其去竞争锁。
unLock 的源码如下:
// 释放锁
public void unlock() {
sync.release(1);
}
6.7.4 Condition
ReentrantLock 对 Condition 并没有改造,直接使用 AQS 的 ConditionObject 即可。
6.8 synchronized 和 ReentrantLock 的区别
- 两者都是可重入锁
- synchronized是 JVM 层面,底层是通过 monitor 对象来实现,wait/notify 等方法都依赖于 monitor 对象。而 ReentrantLock 是具体的一个接口,是 API 层面的锁。
- synchronized 不需要用户手动释放锁,ReentrantLock 需要手动释放锁,不然可能导致死锁。
- ReentrantLock 增加了一些高级功能:
- ReentrantLock 提供了一种能够中断正在等待的线程的机制。
- ReentrantLock 可以指定锁是公平的还是不公平的。
- ReentrantLock 可以精准唤醒某个线程,而synchronized只能全部或者随机唤醒。
ReentrantLock 并不是用来取代 synchroinized,而是应该在 synchroinized 无法满足我们需求的时候才使用 ReentrantLock。
6.9 ReentrantLock 在功能上是 synchronized 的超集,那 synchronized 还有使用的必要吗?
synchronized 是关键字,而 ReentrantLock 是类,这是二者的本质区别。synchronized 依赖于 JVM 而 ReentrantLock 依赖于 API。
基于以下理由,仍然推荐在 synchronized 与 ReentrantLock 都可满足需要时优先使用 synchronized:
- synchronized 是在 Java 语法层面的同步,足够清晰,也足够简单。每个 Java 程序员都熟悉 synchronized,但 J.U.C 中的 Lock 接口则并非如此。因此在只需要基础的同步功能时,更推荐 synchronized。
- Lock 应该确保在 finally 块中释放锁,否则一旦受同步保护的代码块中抛出异常,则有可能永远不会释放持有的锁。这一点必须由程序员自己来保证,而使用 synchronized 的话则可以由 Java 虚拟机来确保即使出现异常,锁也能被自动释放。
尽管在 JDK 5 时代 ReentrantLock 曾经在性能上领先过 synchronized,但这已经是十多年之前的胜利了。从长远来看,Java 虚拟机更容易针对 synchronized 来进行优化,因为 Java 虚拟机可以在线程和对象的元数据中记录 synchronized 中锁的相关信息,而使用 J.U.C 中的 Lock 的话,Java 虚拟机是很难得知具体哪些锁对象是由特定线程锁持有的。
7. 如何利用 AQS 自定义一个互斥锁?
AQS 通过提供 state 及 FIFO 队列的管理,为我们提供了一套通用的实现锁的底层方法,基本上定义一个锁,都是转为在其内部定义 AQS 的子类,调用 AQS 的底层方法来实现的,由于 AQS 在底层已经为了定义好了这些获取 state 及 FIFO 队列的管理工作,我们要实现一个锁就比较简单了,我们可以基于 AQS 来实现一个非可重入的互斥锁,如下所示:
publicclass Mutex { private Sync sync = new Sync(); public void lock () { sync.acquire(1); } public void unlock () { sync.release(1); } private static class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire (int arg) { return compareAndSetState(0, 1); } @Override protected boolean tryRelease (int arg) { setState(0); return true; } @Override protected boolean isHeldExclusively () { return getState() == 1; } } }