引言
实现一个锁需要解决哪些问题?(需求讨论)
1、锁一般都是互斥的,如何标识一把锁已经被某线程抢占了(其他线程在锁释放之前只能阻塞,无法抢占锁),可以使用boolean类型,也可以使用int类型的0和1来解决,不过需要使用volatile修饰,解决多个线程访问共享变量的可见性问题。AQS中有一个volatile int state变量。
2、只知道锁已经被某线程抢占了还不行,还得知道是哪一个线程持有锁,所以需要一个变量标识持有锁的线程。AQS中有一个变量exclusiveOwnerThread。
3、抢占锁失败的线程在锁被释放以后还有机会继续尝试抢占锁,那么需要一种数据结构来保存这些抢占锁失败的线程。考虑到竞争的公平性,这个数据结构最好是一个FIFO队列,哪一个线程先去抢占锁那么他就排在队列的前面。
公平锁与非公平锁最大的区别,在独占锁被释放以后,是否优先考虑等待队列中的线程去抢占锁。
4、需要有一个线程挂起/唤醒机制,Thread.sleep、wait/notify、LockSupport.park/unpark、Thread.join,这些通信方式中只有LockSupport支持唤醒任何线程。
5、锁的可重入性,如果一个线程已经得到锁,在没有释放锁的前提下(只有处理完自己的资源后才需要释放锁),需要再次获取锁,但独占锁又被自己持有,这样就会产生死锁。锁的可重入性就是为了解决这种死锁问题的。
AQS中是通过exclusiveOwnerThread和state来实现可冲入的,在state上做加法不断计数表示锁的重入次数。
6、其他特殊场景的处理,比如说共享锁,某一线程已经持有锁资源后,也允许其他线程共同持有锁资源。
一、AbstractQueuedSynchronizer.Node
Node类是一个AbstractQueuedSynchronizer(以下简称AQS)内部静态类,成员变量如下
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
说明:
1 从成员变量的名称上可以猜测,SHARED是用于共享锁场景,EXCLUSIVE用于独占锁场景。
2 状态变量waitStatus表示线程等待队列中的Node节点的等待状态,初始值为0,独占锁场景下,其值只能是CANCELLED或者SIGNAL;PROPAGATE可能是共享锁场景要用到的,CONDITION是在通过调用AQS.ConditionObject的await,singnal/singnalAll时才出现的。
3 我们都知道AQS内部会维持一个FIFO队列,存储那些抢站锁失败的线程,即将线程相关信息封装成Node节点,所以Node内部会持有该Thread,也就是thread变量。
4 FIFO队列中的Node节点之间是通过prev和next来关联的,prev表示当前节点的前驱节点,next表示当前节点的后继节点。
5 还有一个nextWaiter,独占锁场景上没有用上,仅仅作为一个标识,永远会将Node.nextWaiter设置成EXCLUSIVE,也就是null。
二、AbstractQueuedSynchronizer核心属性
private volatile int state;
//从父类AbstractOwnableSynchronizer继承的
private transient Thread exclusiveOwnerThread;
private transient volatile Node head;
private transient volatile Node tail;
说明:
1 state表示锁的状态,独掌锁场景下,state=0表示没有线程占有锁,state=1表示已经有线程占有锁,state>1表示有线程多次占有锁(即重入锁,线程在占有锁的情况下,还没有释放已经占有的锁后,还能接着占有锁)。
2 exclusiveOwnerThread表示独占锁场景下,成功占有锁的线程。
3 sync queue的队头head,为dummy节点(没有Thread信息,waitStatus=0),tail是队尾(新入队列的节点)。
三、公平锁和非公平锁
ReentrantLock的默认构造器是创建非公平锁的,也可通过传入nonFair=false来创建公平锁。
public class ReentrantLock{
//提供所有实现的同步结构
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer{}
//非公平锁
static final class NonfairSync extends Sync {}
//公平锁
static final class FairSync extends Sync {}
public ReentrantLock() {
sync = new NonfairSync();
}
}
四、抢占锁源码阅读
以公平锁FairSync的实现来分析源码
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
主要有四个方法
1 boolean lockFlag = tryAcquire(1);
该方法由AQS的子类FairSync实现,主要是线程抢占锁的逻辑。
2 Node node = addWaiter(Node.EXCLUSIVE);
该方法由AQS实现,线程抢占锁失败后,设置sync queue的head节点,并将当前线程包装成Node节点插入尾节点后返回。
3 boolean queueFlag = acquireQueued(node, 1);
该方法由AQS实现,逻辑比较复杂,主要不断尝试做如下两种操作之一:a 如果上一步addWaiter返回的节点的前驱节点是head节点,则再次尝试抢占锁acquire(1);b 如果不是,则挂起当前线程,让出CPU资源。返回结果是,当前线程是否需要响应中断。
4 selfInterrupt();
该方法是由AQS实现,在上面FairSync.acquire(1)抢占锁失败时,线程是不响应中断的,但是如果抢占锁失败的过程中,真实发生了线程中断,我们可不能不管不问啊。所以会放在抢占锁逻辑的最后面延迟处理一下。
4.1 FairSync.tryAcquire(1)
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
说明:
1 state等于0时,说明没有线程成功抢到锁,若以要检查自己是不是sync queue的第一个Node节点,如果是,就通过CAS机制将state设置成1,并设置成功抢占锁的线程为Node的exclusiveOwnwerThread。
2 如果c大于0,表示已经有线程占有锁,检查占有锁的线程是否是当前线程。若是,则只需设置state增加重入次数。因为线程已经占有锁,所以不再需要使用CAS来保证setState的线程安全。
3 抢占锁其实就做一件事情,设置state的值,多线程环境下,state必须使用volatile来保证可见性。
4.2 FairSync.addWaiter(Node.EXCLUSIVE)
如果tryAcquire失败了,也就是说多个线程同时抢占锁,只会有一个线程成功,那么剩余的线程怎么办?当然需要将将这些线程封装成Node节点保存在一个等待锁的FIFO队列中。
Node node = addWaiter(Node.EXCLUSIVE);
可见独占锁场景下,每一个Node节点的nextWaiter都是Node.EXCLUSIVE。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
说明:
1 如果队列为空队列,尾节点是null,则直接将当前节点入队列(enq)。
2 如果队列已经有元素了,tail不为null,将当前节点的前驱节点设置为tail,然后再通过CAS机制,将当前节点设置成sync queue的尾节点,如果设置成功,也将上任尾节点的后继节点设置成当前节点,形成双向链表。
3 多线程环境下,如果某一线程已经入队列了,那么导致其他线程CAS失败,这时再通过enq入队列,enq通过自旋+CAS保证Node进入队列。
FairSync.enq(Node node)
进入这一方法有两种情况,
a 当前sync queue没有等待锁的Node。
b 由于其他线程率先成功使用CAS入队,导致当前线程入队是时读取到的tail节点不是真实的tail节点,导致入队失败。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
说明:
1 如果sync queue中没有要等待锁的Node(线程)时,初始化一个Node节点作为头节点和尾节点,注意是一个new Node(),什么也没有的Node。由于队列时空的,可能多个线程同时设置head节点,所以需要使用CAS保证线程安全。
2 然后在下一轮的循环中,tail已经不为null了,再通过CAS将node设置到队列的尾部,如果设置失败,继续不断的尝试,直到成功为止。注意这个过程会发生有趣的尾分裂现象。
3 在sync queue中,head节点是哑节点,不代表任何线程。why
addWaiter总结
1 addWaiter不含有关于锁的任何操作,它主要是将抢占锁失败的线程包装成Node节点,并不断尝试将此Node入队列。如果队列是空队列,则初始化一个哑节点(不代表任何线程)作为头节点,再将当前节点加在head节点的后面。
2 addWaiter返回的是当前线程包装的Node节点,也就是enq调用完后,返回的必定是sync queue的尾节点。在addWaiter返回时,当前线程包装成的Node节点已经进入了队列。
4.3 FairSync.acquireQueued(node, 1)
acquireQueued的逻辑比较复杂,需要重点吃透。在分析其源码的前面,需要再次说明几点,
1 能进入到此方法,说明当前线程争抢锁失败后,已经被包装成Node节点,并且进入sync queue了。
2 如果当前节点的前驱节点是head,则尝试再争抢锁一次。
3 检查一下,争抢锁失败后,是否需要将当前线程挂起。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
为什么会在for循环中,不断检查当前节点的前驱节点是head时,再去尝试获取锁呢?
我们知道传入的node节点是addWaiter返回的,它一定是当前队列的尾节点。代码中检测到当前节点的前驱节点是head节点(head节点是哑节点,不代表任何线程,或者说代表当前持有独占锁的线程),表示当前节点已经排在等待锁的队列的最前方了。再接着看
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
1 如果获取锁成功,则将当前Node节点设置成head节点(并把当前节点的thread设置成null,节点的前驱节点设置成null),为什么把当前节点的thread设置成null,因为在tryAcquire成功抢占锁时,已经将AQS.exclusiveOwnableThread设置成成功抢占锁的线程了,Node节点的thread已经没有必要再记录了,从其他角度来说,是将当前线程从等待锁的队列中拿出来了,这是一个让当前线程出队列的变相操作。
2 因为当前Node是sync queue的尾节点了,所以sync queu不可能是空队列了,也就间接表明,没有其他线程做设置队列的head操作,所以可以直接调用setHead方法来保证线程安全性(因为线程已经占有锁)。整个if语句内容是线程安全的,不需要向enq那样使用CAS保证线程安全性。
如果当前Node的前驱节点不是head节点,或者如上所说,但是争抢锁失败了呢?
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()){
interrupted = true;
}
shouldParkAfterFailedAcquire(p, node)
从函数名就可以看出,这个函数是要判断在争抢锁失败后,是否挂起当前线程。
本文开篇提到了AQS.Node结构,前面也用到了prev和next,thread,nextWaiter,就是没有用到waitStatus,注意看接下来就是waitStatus的表演。
前面提到waitStatus有4种值,分别是CANCELLED,SIGNAL,CONDITION,PROPAGATE,再加上初始值0,一共五种值。在独占锁场景下,只用到了CANCELLED(1),SIGNAL(-1),已经初始值0。
那么CANCELLED和SIGNAL到底代表着什么意思呢?
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
通过查看文档发现,
1 CANCELLED表示当前Node节点的线程已经取消了排队,不想再去等待获取锁了。
2 SIGNAL就比较有意思了,它不是表征当前Node节点的waitStatus,而是表征当前节点的下一个节点的waitStatus。
如果一个节点的waitStatus是SIGNAL的话,那就表示它的下一个节点(或者是后继节点)已经被挂起或者(马上就要被挂起),所以当一个节点在释放锁或者取消等待锁之后,还需要做一件非常重要的事,那就是如果它的waitStatus是SIGNAL的话,它还需要唤醒它的后继节点。
另外这个SIGNAL状态,通常都不是节点自己给自己设置的,而是后继节点给其设置的,打个比方来说,
你去食堂排队就餐,你去的比较晚,前面已经排了10个人了,按照每人需要3min完成打饭来算,你可能需要等上半个小时,但是你不想无聊的干等着(去旁边看看其他窗口有啥好吃的),你就跟前面的哥们商量,能不能在快到你(release),或者你不想等待(cancle)时,麻烦通知我一下,于是你将他的状态设置成SIGNAL,去旁边溜达了。
换个角度讲,如果你决定将某个线程挂起,那么你必须要保证在队列中线程的前驱节点的状态是SIGNAL,这样相当于给线程设定了一个闹钟然后才能睡,等设定时间到了,闹钟会叫醒你,否则,你要睡到天荒地老了。
有了这些背景知识,我们再来看源码:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
说明:
1 如果当前节点的前驱节点的waitStatus是SIGNAL状态,那么可以直接关起当前线程。
2 如果前驱节点的waitStatus是CANCELLED,那么通过do-while循环,从后往前找一直找到队列前面waitStatus是SIGNAL的Node节点,并将此节点的后继节点设置成当前节点,并返回false,也就是说当前线程不能挂起。
3 如果前驱节点的waitStatus是其他状态,CONDITION和PROPAGATE,则通过CAS将前驱节点的waitStatus设置成SIGNAL。并返回false,不挂起当前线程。
为了方便查看,shouldParkAfterFailedAcquire返回false后,程序会怎么处理,我将代码重新复制粘贴如下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//注意,在这个地方被调用
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可以看到如果shouldParkAfterFailedAcquire返回false后,程序会继续回到循环中,重新判断前驱节点的状态,因为排在队列前面中的线程一旦出队列(成功抢占锁),队列的head节点就会发生变化,这样就轮到当前线程再次争抢锁了。
如果shouldParkAfterFailedAcquire返回true,后会接着执行parkAndCheckInterrupt将当前线程挂起。
parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
LockSupport.park(this)会将当前线程挂起,除非其他线程调用了LockSupport.unpark唤醒了当前线程,或者当前线程被中断,否则程序一直停在这里不会继续往下执行,后面的Thread.interrupted也不会执行。
五、总结
1 AQS中使用state表示锁,如果线程使用CAS成功将state由0设置成1,即表示获得锁。
2 addWriter将争抢锁失败的线程包装成Node节点,放入sync queue的尾部,是通过enq初始化等待锁队列的,并创建一个哑节点作为head。
3 acquireQueued比较上一步入队列的节点的前驱节点是否是head,如果是,则再次尝试抢占锁;或者将当前线程挂起。
4 shouldParkAfterFailedAcquire保证当前节点的前驱节点waitStatus是SIGNAL状态,从而保证自己挂起后,前驱节点会在适当的时机唤醒自己。
参考:
1 逐行分析AQS源码—独占锁的获取,https://segmentfault.com/a/1190000015739343。
2 Java并发之学习ReentrantLock,https://cloud.tencent.com/developer/article/1038499。
2 不可不说的Java锁事,https://tech.meituan.com/2018/11/15/java-lock.html。