在并发编程领域,有两大核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。这两大问题,管程都能够解决。Java SDK 并发包通过 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题。
Java 语言提供的 synchronized 也是管程的一种实现,Lock 提供了与 synchronized 类似的同步功能,但在使用时需要显式地加锁(lock)和解锁(unlock)。同时为了避免锁泄漏,我们必须在 finally 块中释放锁,保证在获取到锁之后,最终能够被释放。注意,不要将获取锁的过程写在 try 块中,因为如果在获取锁时发生了异常,异常抛出的同时,也会导致锁无故释放,典型的代码结构如下,这是个良好的习惯。
ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
//......
} finally {
lock.unlock();
}
与 synchronized 的区别
1)异常处理
synchronized 在发生异常时,会自动释放线程占有的锁,不会发生死锁。Lock 在发生异常时,若没有主动通过unlock() 方法释放锁,则很可能造成死锁,所以一般在 finally 块中写 unlock() 以防死锁。
2)能够响应中断
synchronized 的问题是,持有锁 A 后,如果尝试获取锁 B 失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号时能够唤醒它,那它就有机会释放曾经持有的锁 A。
Lock 提供了一个 locklnterruptibly() 方法用来获得锁,但会优先响应中断。当在等待获取锁的过程中被外部中断时,会抛出 InterruptedException 异常,同时会释放已获得的锁。
3)支持非阻塞获取、超时获取锁
Lock 提供了 tryLock() 方法用于尝试申请相应 Lock 实例的锁。如果锁未被其他线程持有,则该方法返回 true 表示获得了锁;否则,该方法并不会导致其执行线程被暂停而是直接返回 false,表示未获得锁。Lock 还提供了 tryLock(long time, TimeUnit unit) 方法,该方法会在给定时间内尝试获取锁,若超时则直接返回 false。
4)公平锁
Lock 提供了一个构造函数,允许我们对其公平性进行设置。公平性锁保证了锁的获取按照 FIFO 原则,但代价是进行大量的线程切换(增加了线程的暂停和唤醒)。非公平锁虽然可能造成线程 “饥饿”,但极少的线程切换保证了更大的吞吐量。
5)可绑定多个条件
一个 ReentrantLock 对象可以同时绑定多个 Condition 对象。而在 synchronized 中,锁对象的 wait() 跟它的 notify() 或 notifyAll() 方法配合只可以实现一个隐含的条件,但如果要和多于一个的条件关联时就不得不额外添加一个锁了。
6)监控锁的相关信息
Lock 提供了一些方法用来对锁的相关信息进行监控,比如:isLocked() 方法用于检测相应锁是否被某个线程持有,getQueueLength() 方法用于检查相应锁的等待线程数。而 synchronized 是没有的。
可重入锁
可重入锁(ReentrantLock)指的是线程可以重复获取同一把锁。如下代码示例,当线程 T1 执行到 ① 处时,已经获取到了锁 rtl ,当在 ① 处调用 get() 方法时,会在 ② 再次对锁 rtl 执行加锁操作。此时,如果锁 rtl 是可重入的,那么线程 T1 可以再次加锁成功;如果锁 rtl 是不可重入的,那么线程 T1 此时会被阻塞。
public class X {
private final Lock rtl = new ReentrantLock();
private int value;
public int get() {
// 获取锁
rtl.lock(); ②
try {
return value;
} finally {
rtl.unlock();
}
}
public void addOne() {
// 获取锁
rtl.lock();
try {
value = 1 + get(); ①
} finally {
rtl.unlock();
}
}
}
ReentrantLock 这个类有两个构造函数,一个是无参构造函数,一个是传入 fair 参数的构造函数。fair 参数代表的是锁的公平策略,如果传入 true 就表示需要构造一个公平锁,反之则表示要构造一个非公平锁。
// 无参构造函数:默认非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
// 根据公平策略参数创建锁
public ReentrantLock(boolean fair){
sync = fair ? new FairSync() : new NonfairSync();
}
每个 Lock 实例都对应着一个等待队列,如果一个线程没有获得锁,就会进入等待队列,当有线程释放锁时就需要从等待队列中唤醒一个等待的线程。如果是公平锁,唤醒的策略就是谁等待的时间长,就唤醒谁;如果是非公平锁,则不提供这个公平保证,有可能等待时间短的线程反而先被唤醒。
1. 内部结构
ReentrantLock 有三个内部类:Sync、NonfairSync 和 FairSync,所有操作都是通过这三个内部类完成的。
ReentrantLock 对于 Lock 接口的实现主要依赖了 Sync,而 Sync 继承了 AbstractQueuedSynchronizer(AQS)来进行锁状态的管理。在 AQS 中,定义了一个 volatile int state 变量作为共享资源,并且定义了锁的获取(acquire)与释放(release)以及维护了一个同步队列用来保存在该锁上等待的线程。
在重入锁的逻辑中重写了 AQS 具体获取(tryAcquire)与释放锁(tryRelease)的步骤。
2. lock
public void lock() {
sync.lock();
}
重入锁的加锁过程底层都需要用到 AQS 的 acquire 方法来获取同步状态。acquire 方法是 AQS 内部提供的一个方法,该方法的实现又依赖其子类实现的 tryAcquire 方法,重入锁的两种实现分别重写了 tryAcquire 方法,用以实现锁的可重入逻辑。
FairSync 的 lock 方法:
final void lock() {
// 必须以独占模式获取锁,在后续的调用中必须按照等待队列的顺序获取锁
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先判断等待队列中是否有等待获取锁的节点,如果没有则尝试获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 可重入逻辑
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
NonfairSync 的 lock 方法:
final void lock() {
// 一开始就尝试修改state状态,如果此时锁没有被获取,则状态修改成功,当前线程获取锁,体现了非公平的策略
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 如果修改失败,说明其他线程使用了这个锁,则以独占模式尝试获取锁
acquire(1);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 直接尝试获取锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 可重入逻辑
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
3. unlock
public void unlock() {
sync.release(1);
}
重入锁的解锁过程底层调用了 AQS 的 release 方法来释放同步状态。release 方法也是 AQS 内部提供的一个方法,该方法的实现又依赖其子类实现的 tryRelease 方法,ReentrantLock 中的 Sync 重写了这个方法:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 只有同步状态完全释放了,才返回true(可重入逻辑)
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
AQS 实现
Doug Lea 曾经介绍过 AQS 的设计初衷。从原理上,一种同步结构往往是可以利用其他的结构实现的。但对某种同步结构的倾向,会导致复杂、晦涩的实现逻辑,所以,他选择将基础的同步操作抽象在 AQS 中,利用 AQS 为我们构建同步结构提供了范本。
1. 整体架构
当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或解锁操作时,先经过第一层的 API 进入 AQS 内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理均依赖第五层的基础数据提供层。
2. 实现原理
AQS 核心思想是:如果被请求的共享资源空闲,那就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制用的是 CLH 队列的变体实现的,将暂时获取不到锁的线程封装成一个节点加入到队列中。
2.1 state
AQS 内部维护了一个用 volatile 修饰的 state 变量,同时提供了 setState 和 getState 方法,这个变量就是用来保存锁状态的。AQS 使用 CAS 操作来修改当前锁的状态,根据这个锁状态来判断该锁是否己经被别的线程持有了。同时在 AQS 内部还维护了一个先入先出(FIFO)的等待队列,所有没有请求到锁的线程,会进入到等待队列进行等待。等待线程释放锁之后,系统就能从等待队列中唤醒一个线程,继续工作。
public abstract class AbstractQueuedSynchronizer {
// Head of the wait queue
private transient volatile Node head;
// Tail of the wait queue
private transient volatile Node tail;
// The synchronization state
private volatile int state;
......
}
2.2 等待队列
AQS 依赖内部的等待队列(一个 FIFO 双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,AQS 会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入到等待队列的尾部,同时会阻塞当前线程。头节点(head)是获取同步状态成功的节点,头节点的线程在释放同步状态时,会唤醒其 next 节点,而后继节点将会在获取同步状态成功时将自己设置为头节点。
static final class Node {
// 指示节点正在共享模式下等待的标记
static final Node SHARED = new Node();
// 指示节点正在独占模式下等待的标记
static final Node EXCLUSIVE = null;
// 等待状态
volatile int waitStatus;
volatile Node prev;
volatile Node next;
// 存放进入AQS队列中的线程
volatile Thread thread;
// 用来构造条件队列
Node nextWaiter;
......
}
队列中的 Node 节点有两种模式:
- 独占节点:同一时刻只能有一个线程访问资源,如 ReentrantLock
- 共享节点:同一时刻允许多个线程访问资源,如 Semaphore
其中 waitStatus 有下面几个枚举值:
- 0:当一个 Node 被初始化的时的默认值
- CANCELLED:为 1,表示线程获取锁的请求已经取消了
- CONDITION:为 -2,表示节点在等待队列中,节点线程等待唤醒
- PROPAGATE:为 -3,当前线程处在 SHARED 情况下,该字段才会使用
- SIGNAL:为 -1,表示线程已经准备好了,就等资源释放了
并通过 prev、next 指针构造双向的链表结构做为等待队列:
3. acquire
通过调用 AQS 的 acquire 方法可以获取同步状态,具体逻辑如下:
public final void acquire(int arg) {
// 尝试获取同步状态
if (!tryAcquire(arg) &&
// 如果同步状态获取失败,则构造同步节点(独占式),并通过addWaiter方法将该节点加入到等待队列尾部
// 最后调用acquireQueued方法,使得该节点以"死循环"的方式获取同步状态,
// 如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如果在等待过程中被中断了,则重新把中断标志位设置上
selfInterrupt();
}
tryAcquire 在 AQS 中是一个直接抛出异常的实现,其实现逻辑由子类来实现。该方法返回一个布尔值,true 表示当前线程能够访问资源,false 表示当前线程不能访问资源,所以 tryAcquire 的作用是:决定线程是否能够访问受保护的资源。AQS 不关心 tryAcquire 方法里的具体逻辑,只需要知道能不能访问受保护的资源,然后来决定线程是放行还是进入等待队列(阻塞)。
3.1 addWaiter
假设在多线程环境下,线程 A 调用 tryAcquire 方法返回 true,线程 B 稍慢一步,调用 tryAcquire 方法返回了 false。则线程 A 获取同步状态成功,直接返回。由于线程 B 抢占失败执行 addWaiter 方法,该方法将新线程添加到等待队列的队尾。AQS 的中有两个属性 head、tail 分别表示等待队列的队首和队尾。
如果等待队列还未初始化,则会先初始化队列。在 enq 方法中会新建一个 Node 做为 head 和 tail,然后在之后的 for 循环中将参数 node 添加到队尾,因此等待队列初始化完后,里面会有两个节点,一个是空的结点 new Node(),另外一个就是对应当前线程的结点。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head; // 初始化一个空节点
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
由于线程 A 在 tryAcquire 时返回了 true,所以它会被直接放行,那么只有线程 B 会进入 addWaiter 方法,此时的等待队列如下:
3.2 acquireQueued
当线程 B 被添加到等待队列的尾部后,会继续执行 acquireQueued 方法,这就是 AQS 阻塞线程的地方。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 无限循环
for (;;) {
// 获取参数node的前一个节点
final Node p = node.predecessor();
// 重新调用tryAcquire判断线程是否能够访问资源了
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 判断当前线程获取锁失败后,需不需要阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
// 使用LockSupport阻塞当前线程
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
方法体中有一个 for (;;) 无限循环,该循环的退出条件只有 node 的前一个节点是 head 并且 tryAcquire 方法返回 true 时才会退出,否则线程就会被 parkAndCheckInterrupt 阻塞。
线程被阻塞后就不会继续执行了,但等到它被唤醒后,它还在 for (;;) 循环中,因此它又会调用 tryAcquire 方法去抢占锁,如果还是失败,那它又会被阻塞。
shouldParkAfterFailedAcquire 方法接收两个参数:前一个节点、当前节点,它会判断前一个节点的 waitStatus 属性是否等于 Node.SIGNAL,如果相等则返回 true。
4. release
AQS 中的 release 方法用于释放同步状态,该方法在释放了同步状态后,会唤醒其后继节点,进而使得后继节点重新尝试获取同步状态。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
同 tryAcquire 方法一样,tryRelease 方法的具体逻辑也是由子类来实现。该方法返回一个布尔值,表示当前线程释放锁是否成功。如果当前线程锁释放锁成功,则继续调用下面的 unparkSuccessor 方法唤醒其他线程。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
......
if (s != null)
LockSupport.unpark(s.thread);
}
该方法首先将 node.waitStatus 设置为 0,然后获取 node 的下一个节点,最后调用 LockSupport##unpark 方法唤醒线程,至此我们的 B 线程就被唤醒了。
唤醒后重新获取锁的逻辑:
接着上面 acquireQueued 方法的逻辑,线程 B 被唤醒后会重新尝试获取锁,线程 B 会先拿到它的上一个节点(当前是 head 结点),然后使用 if 判断:
if (p == head && tryAcquire(arg))
根据现在等待队列中的节点状态(只有一个线程 B 的节点,见上图),p == head 是返回 true 的,然后就是tryAcquire(arg) 了,假设此时线程 A 已经释放了锁,那现在线程 B 就能获取到锁了,所以 tryAcquire(arg) 也会返回 true。当线路 B 拿到锁后,会调用 setHead(node) 设置自己为队列的头节点:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
5. doAcquireNanos
doAcquireNanos 方法可以超时获取同步状态,即在指定的时间段内获取同步状态,如果获取到同步状态则返回 true,否则返回 felse。该方法是实现超时获取锁方法的关键。
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 前驱节点为头节点且尝试获取同步状态
if (p == head && tryAcquire(arg)) {
// 获取同步状态成功,设置当前节点为头结点
setHead(node);
p.next = null;
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
// 如果当前线程获取同步状态失败,则判断是否已经超时,超时直接退出
if (nanosTimeout <= 0L)
return false;
// 剩余超时时间过短则不阻塞线程
if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
// 使当前线程等待剩余超时时长
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}