引言
上一篇文章,我们学习了Lock接口,它提供的几个方法实现了显式的加锁和解锁逻辑。这篇文章,我们来看它的一个很重要的实现类ReentrantLock以及跟ReentrantLock的实现关系密切的AbstractQueuedSynchronizer。
类体系
首先,我们来看ReentrantLock和AbstractQueuedSynchronizer相关的类体系结构。AbstractQueuedSynchronizer是实现Lock的基本框架,它使用先进先出队列(FIFO)来实现线程之间的同步。ReentrantLock中的内部类Sync实现了AbstractQueuedSynchronizer,然后另外两个内部类NonfairSync和FairSync又实现了Sync,分别表示公平锁和非公平锁。所以大致的类图如下所示:
独占锁的获取与释放
独占锁的获取
独占锁的获取在ReentrantLock调用的是lock方法:
public void lock() {
sync.lock();
}
lock是内部类Sync的抽象方法:
abstract void lock();
具体实现由NonfairSync和FairSync分别给出。我们来看NonfairSync的实现:
先快速尝试获取锁
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
首先,它调用compareAndSetState方法尝试原子性地将state字段设置为1,compareAndSetState方法是AQS提供的protected,方便子类设置状态,类似的方法还有getState和setState。如果设置成功,就调用setExclusiveOwnerThread方法将同步器的持有线程设置为当前线程。这个逻辑就是快速尝试获取锁。快速尝试获取锁如果成功了,lock方法就直接返回,当前线程就继续执行。
如果快速获取锁没有成功,就会调用acquire方法,我们来看acquire方法的实现:
acquire方法在AQS中就给出了实现:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
快速获取锁失败则调用tryAcquire尝试获取锁
它首先调用了tryAcquire方法,tryAcquire方法在AQS中并没有给出实现,在Sync和FairSync中分别实现了该方法,而NonfairSync直接用的是Sync的该方法,Sync中该方法的实现如下:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
这个方法的逻辑也比较简单,首先,获取当前state,如果state=0,就认为当前锁没有被持有,然后调用compareAndSetState来获取锁,如果获取成功,就将同步器的所有者线程设置为当前线程,如果compareAndSetState获取失败,就要走重入锁的逻辑。else if里面就是重入的逻辑,如果当前线程就是同步器的所有者线程,那么当前线程对锁的请求会再次成功,只是需要将state设置为原来的state+acquire的值,用来表示获取的次数。
如果tryAcquire获取锁成功,acquire方法就会返回,当前线程获取了锁,继续执行。
tryAcquire获取失败,构造节点加入等待队列
如果获取锁没有成功,会执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法,我们先来看addWaiter方法:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
首先,初始化了一个Node对象,我们需要注意Node的这个构造方法:
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
它将当前Node的nextWaiter设置为传入的mode,这里我们传入的是Node.EXCLUSIVE,也就是null,也就是说当前Node的nextWaiter是null。
然后会addWaiter会尝试快速地将Node入队列。快速入队列的逻辑是在当前同步器node队列的尾节点存在的情况下,将当前线程的node添加到队尾,这里用了compareAndSetTail原子操作来保证设置尾节点的并发安全。如果
当前同步器没有尾结点或者compareAndSetTail没有成功(存在竞争),就会调用enq方法来将节点入队列:
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
//自旋 保证节点最后会成功入队列
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//尾结点为null说明同步器的Node队列没有初始化,需要先初始化
//初始化会构造一个新的节点,这个节点没有对应的thread
if (compareAndSetHead(new Node()))
tail = head;
} else {
//将当前Node加到队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq方法整体就是一个自旋操作,它不断地尝试将当前节点插入到队尾。这里有一点需要注意的是尾结点不存在也就是队列还没有初始化的情况,如果没有初始化,会直接构造一个新的节点将其作为头结点(此时也是尾结点),这个新的头结点没有前驱节点和后继节点,也没有对应的线程和nextWaiter。队列初始化成功之后,就会将前面我们构造的节点设置为头结点的后继节点并作为尾结点。对于每个node来说,自旋操作肯定会最终将这个Node入队列。
从这一步始,我们假设一种场景,线程1来调用lock方法获取锁,此时没有其他线程的竞争,它肯定会在快速获取锁(compareAndSetState)这一步获取成功,这个线程就没有对应的Node对象(不会走到addWaiter方法),我们同时假设它将持有锁很长一段时间。在线程1获取锁之后,线程2也来获取锁,因为当前锁被线程1持有,所以线程2会走到addWaiter方法中初始化同步器的队列,并且线程2对应的Node是队列的尾结点,头结点是一个没有对应线程的节点。
节点加入队列后线程挂起并等待唤醒
线程加入队列之后,执行的是acquireQueued方法,我们来看它的实现:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//记录当前线程是否被中断过
boolean interrupted = false;
//自旋
for (;;) {
final Node p = node.predecessor();
//只有当前节点的前一个节点是头结点 才能尝试获取锁
if (p == head && tryAcquire(arg)) {
//获取锁成功后 将当前节点设置为头结点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这个方法也是处在一个自旋中,不过线程并不会一直运行,如果没有获取成功,它会调用shouldParkAfterFailedAcquire方法,我们看这个方法的实现:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
它会去检查它的前驱节点的状态。如果前驱节点是signal状态,这个状态的节点对应的线程释放锁之后会通知后续节点对应的线程,所以直接返回true。如果waitStatus>0,说明前驱节点是cancelled状态,这个状态的线程取消了对锁的请求,所以会循环将这种状态的节点断掉。如果waitStatus处于其他的状态,就将它的状态设置为signal。
我们继续上面的场景,线程2对应的节点加入队列成功后会执行acquireQueued方法,由于锁被线程1持有,所以获取锁肯定不能成功,进而执行shouldParkAfterFailedAcquire方法,而线程2对应的节点的前驱节点的状态为0,所以会调用compareAndSetWaitStatus方法将其状态设置为signal,然后返回false继续循环,第二次循环仍不能获取到锁,所以还会执行shouldParkAfterFailedAcquire方法,这次它的前驱节点的状态已经是signal了,所以直接返回true,调用parkAndCheckInterrupt方法,这个方法就很简单了:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
它会调用LockSuppport将当前线程挂起,使其处于WAITING状态。此时线程2就会在LockSupport.park(this)这里一直等待下去,等待唤醒。
这里还有一个线程被中断的逻辑,我没有分析明白,这里只讲解正常的流程。
如果此时又来了一个线程3来请求锁呢?逻辑也是一样的,线程3也会构造一个Node然后加入队列中,然后在acquireQueued方法中调用shouldParkAfterFailedAcquire方法,将它的前驱节点也就是线程2对应的状态设置为signal,然后执行LockSupport.park(this)将自己挂起等待前一个线程的唤醒。
为什么必须前一个节点是头结点才能尝试获取锁呢?应该是为了保证FIFO(个人猜测)。
如果线程获得了锁呢?逻辑就很简单了,它会将自己对应的节点设置为头结点,然后在自旋中返回。
独占锁的释放
独占锁的释放调用的是ReentrantLock的unlock方法:
public void unlock() {
sync.release(1);
}
AQS直接提供了release的实现:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
调用tryRelease尝试释放锁
release首先调用的是tryRelease方法来释放锁,tryRelease方法是Sync实现的:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
它的逻辑很简单,就是设置state字段和同步器的所有者线程字段,不过这里要考虑到重入锁的问题,对于重入锁,state要设置成state的当前值-releases并且同步器所有者线程不能为null,还是当前线程。我们还需要注意返回值,如果是重入锁的逻辑,返回的是false,否则返回的是true,返回值代表着锁是否被释放了,重入锁只会减少锁持有的数量,如果数量不为零,锁不会被释放。
重入锁锁的持有数量不为零直接返回
release方法中,对tryRelease返回值进行了判断,如果锁没有被释放,也就是重入锁被持有的数量不为零,就会直接返回false,不会对节点队列做任何操作。
锁被释放后唤醒后继节点对应的线程
如果tryRelease方法成功释放了锁,会调用unparkSuccessor方法,这个方法会唤醒后继节点对应的线程:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
最后的LockSupport.unpark(s.thread);进行了唤醒操作。
继续我们的场景,线程1占用着锁,线程2和线程3被挂起处于WAITING状态。此时,线程1释放锁,而线程2对应的节点是头结点的后继节点,所以它会被唤醒,被唤醒后,线程2会将自己对应的节点设置为头结点。
整体来看,独享锁的释放逻辑比较简单。
独占式超时锁的获取
ReentrantLock的tryLock(long timeout, TimeUnit unit)方法用来超时获取锁。它的实现如下:
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
它调用的是sync的tryAcquireNanos方法,该方法由AQS给出:
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
它同样会首先调用tryAcquire来尝试获得锁,如果快速获取成功则返回true,否则执行doAcquireNanos方法:
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
与独占式获取锁的逻辑类似,首先会通过调用addWaiter方法将当前线程对应的节点入队列,然后自旋获取锁,获取锁的逻辑也与独占锁类似,只是在获取锁失败后,会看还剩多长的获取时间,如果没有剩余的获取时间,直接返回false,否则就调用LockSupport.parkNanos方法让线程挂起剩余的获取时间,注意这里将剩余的获取时间与进行了比较,如果剩余获取时间小于这个时间,则不挂起线程,直接自旋去获取锁。
spinForTimeoutThreshold的定义如下:
/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices
* to improve responsiveness with very short timeouts.
*/
static final long spinForTimeoutThreshold = 1000L;
ReentrantLock的其他方法
独占锁的获取和释放是ReentrantLock的lock方法和unlock方法实现的,lock和unlock分别调用了AQS的acquire和release方法。超时独占锁是ReentrantLock的tryLock(long timeout, TimeUnit unit)方法实现的,它调用了AQS的tryAcquireNanos(int arg, long nanosTimeout)方法。我们继续看ReentrantLock的其他一些方法。
lockInterruptibly方法
这个方法在获取锁时能够响应中断,如果其他线程对该线程调用了Thread.interrupt ()方法,就会抛出InterruptedException异常。
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
它调用的是sync.acquireInterruptibly(1)方法,这个方式是AQS提供的,我们来看它的实现:
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
它也会首先调用tryAcquire方法来尝试快速获取锁,如果快速获取锁成功,就直接返回,如果失败,就会调用doAcquireInterruptibly方法,我们重点来看这个方法:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这个方法与上面我们分析的独占锁中的acquireQueued方法以及超时独占锁中的doAcquireNanos方法很相似。首先,会构造当前线程对应的节点添加到等待队列中,然后自旋来获取锁,不能获取锁就自己挂起。与另外两个方法不同的是,如果线程被唤醒并且线程中断过,就会抛出InterruptedException异常。
小结
ReentrantLock与AQS紧密结合实现了重入锁的逻辑,包括独占锁的获取和释放、超时独占锁的获取、能够响应中断的获取锁等。ReentrantLock还有很多其他的内容,这里不再一一介绍。AQS除了能实现独占锁,还能实现共享锁,下一篇文章,我们来看共享锁的实现。