引言

实现一个锁需要解决哪些问题?(需求讨论)
1、锁一般都是互斥的,如何标识一把锁已经被某线程抢占了(其他线程在锁释放之前只能阻塞,无法抢占锁),可以使用boolean类型,也可以使用int类型的0和1来解决,不过需要使用volatile修饰,解决多个线程访问共享变量的可见性问题。AQS中有一个volatile int state变量。
2、只知道锁已经被某线程抢占了还不行,还得知道是哪一个线程持有锁,所以需要一个变量标识持有锁的线程。AQS中有一个变量exclusiveOwnerThread。
3、抢占锁失败的线程在锁被释放以后还有机会继续尝试抢占锁,那么需要一种数据结构来保存这些抢占锁失败的线程。考虑到竞争的公平性,这个数据结构最好是一个FIFO队列,哪一个线程先去抢占锁那么他就排在队列的前面。
公平锁与非公平锁最大的区别,在独占锁被释放以后,是否优先考虑等待队列中的线程去抢占锁。
4、需要有一个线程挂起/唤醒机制,Thread.sleep、wait/notify、LockSupport.park/unpark、Thread.join,这些通信方式中只有LockSupport支持唤醒任何线程。
5、锁的可重入性,如果一个线程已经得到锁,在没有释放锁的前提下(只有处理完自己的资源后才需要释放锁),需要再次获取锁,但独占锁又被自己持有,这样就会产生死锁。锁的可重入性就是为了解决这种死锁问题的。
AQS中是通过exclusiveOwnerThread和state来实现可冲入的,在state上做加法不断计数表示锁的重入次数。
6、其他特殊场景的处理,比如说共享锁,某一线程已经持有锁资源后,也允许其他线程共同持有锁资源。

一、AbstractQueuedSynchronizer.Node

Node类是一个AbstractQueuedSynchronizer(以下简称AQS)内部静态类,成员变量如下

  1. static final class Node {
  2. static final Node SHARED = new Node();
  3. static final Node EXCLUSIVE = null;
  4. static final int CANCELLED = 1;
  5. static final int SIGNAL = -1;
  6. static final int CONDITION = -2;
  7. static final int PROPAGATE = -3;
  8. volatile int waitStatus;
  9. volatile Node prev;
  10. volatile Node next;
  11. volatile Thread thread;
  12. Node nextWaiter;
  13. }

说明:

1 从成员变量的名称上可以猜测,SHARED是用于共享锁场景,EXCLUSIVE用于独占锁场景。
2 状态变量waitStatus表示线程等待队列中的Node节点的等待状态,初始值为0,独占锁场景下,其值只能是CANCELLED或者SIGNAL;PROPAGATE可能是共享锁场景要用到的,CONDITION是在通过调用AQS.ConditionObject的await,singnal/singnalAll时才出现的。
3 我们都知道AQS内部会维持一个FIFO队列,存储那些抢站锁失败的线程,即将线程相关信息封装成Node节点,所以Node内部会持有该Thread,也就是thread变量。
4 FIFO队列中的Node节点之间是通过prev和next来关联的,prev表示当前节点的前驱节点,next表示当前节点的后继节点。
5 还有一个nextWaiter,独占锁场景上没有用上,仅仅作为一个标识,永远会将Node.nextWaiter设置成EXCLUSIVE,也就是null。

二、AbstractQueuedSynchronizer核心属性

  1. private volatile int state;
  2. //从父类AbstractOwnableSynchronizer继承的
  3. private transient Thread exclusiveOwnerThread;
  4. private transient volatile Node head;
  5. private transient volatile Node tail;

说明:
1 state表示锁的状态,独掌锁场景下,state=0表示没有线程占有锁,state=1表示已经有线程占有锁,state>1表示有线程多次占有锁(即重入锁,线程在占有锁的情况下,还没有释放已经占有的锁后,还能接着占有锁)。
2 exclusiveOwnerThread表示独占锁场景下,成功占有锁的线程。
3 sync queue的队头head,为dummy节点(没有Thread信息,waitStatus=0),tail是队尾(新入队列的节点)。

三、公平锁和非公平锁

ReentrantLock的默认构造器是创建非公平锁的,也可通过传入nonFair=false来创建公平锁。

  1. public class ReentrantLock{
  2. //提供所有实现的同步结构
  3. private final Sync sync;
  4. abstract static class Sync extends AbstractQueuedSynchronizer{}
  5. //非公平锁
  6. static final class NonfairSync extends Sync {}
  7. //公平锁
  8. static final class FairSync extends Sync {}
  9. public ReentrantLock() {
  10. sync = new NonfairSync();
  11. }
  12. }

四、抢占锁源码阅读

以公平锁FairSync的实现来分析源码

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

主要有四个方法
1 boolean lockFlag = tryAcquire(1);
该方法由AQS的子类FairSync实现,主要是线程抢占锁的逻辑。

2 Node node = addWaiter(Node.EXCLUSIVE);
该方法由AQS实现,线程抢占锁失败后,设置sync queue的head节点,并将当前线程包装成Node节点插入尾节点后返回。

3 boolean queueFlag = acquireQueued(node, 1);
该方法由AQS实现,逻辑比较复杂,主要不断尝试做如下两种操作之一:a 如果上一步addWaiter返回的节点的前驱节点是head节点,则再次尝试抢占锁acquire(1);b 如果不是,则挂起当前线程,让出CPU资源。返回结果是,当前线程是否需要响应中断。
4 selfInterrupt();
该方法是由AQS实现,在上面FairSync.acquire(1)抢占锁失败时,线程是不响应中断的,但是如果抢占锁失败的过程中,真实发生了线程中断,我们可不能不管不问啊。所以会放在抢占锁逻辑的最后面延迟处理一下。

4.1 FairSync.tryAcquire(1)

  1. protected final boolean tryAcquire(int acquires) {
  2. final Thread current = Thread.currentThread();
  3. int c = getState();
  4. if (c == 0) {
  5. if (!hasQueuedPredecessors() &&
  6. compareAndSetState(0, acquires)) {
  7. setExclusiveOwnerThread(current);
  8. return true;
  9. }
  10. }
  11. else if (current == getExclusiveOwnerThread()) {
  12. int nextc = c + acquires;
  13. if (nextc < 0)
  14. throw new Error("Maximum lock count exceeded");
  15. setState(nextc);
  16. return true;
  17. }
  18. return false;
  19. }

说明:
1 state等于0时,说明没有线程成功抢到锁,若以要检查自己是不是sync queue的第一个Node节点,如果是,就通过CAS机制将state设置成1,并设置成功抢占锁的线程为Node的exclusiveOwnwerThread。
2 如果c大于0,表示已经有线程占有锁,检查占有锁的线程是否是当前线程。若是,则只需设置state增加重入次数。因为线程已经占有锁,所以不再需要使用CAS来保证setState的线程安全。
3 抢占锁其实就做一件事情,设置state的值,多线程环境下,state必须使用volatile来保证可见性。

4.2 FairSync.addWaiter(Node.EXCLUSIVE)

如果tryAcquire失败了,也就是说多个线程同时抢占锁,只会有一个线程成功,那么剩余的线程怎么办?当然需要将将这些线程封装成Node节点保存在一个等待锁的FIFO队列中。
Node node = addWaiter(Node.EXCLUSIVE);
可见独占锁场景下,每一个Node节点的nextWaiter都是Node.EXCLUSIVE。

  1. private Node addWaiter(Node mode) {
  2. Node node = new Node(Thread.currentThread(), mode);
  3. // Try the fast path of enq; backup to full enq on failure
  4. Node pred = tail;
  5. if (pred != null) {
  6. node.prev = pred;
  7. if (compareAndSetTail(pred, node)) {
  8. pred.next = node;
  9. return node;
  10. }
  11. }
  12. enq(node);
  13. return node;
  14. }

说明:
1 如果队列为空队列,尾节点是null,则直接将当前节点入队列(enq)。
2 如果队列已经有元素了,tail不为null,将当前节点的前驱节点设置为tail,然后再通过CAS机制,将当前节点设置成sync queue的尾节点,如果设置成功,也将上任尾节点的后继节点设置成当前节点,形成双向链表。
3 多线程环境下,如果某一线程已经入队列了,那么导致其他线程CAS失败,这时再通过enq入队列,enq通过自旋+CAS保证Node进入队列。

FairSync.enq(Node node)
进入这一方法有两种情况,
a 当前sync queue没有等待锁的Node。
b 由于其他线程率先成功使用CAS入队,导致当前线程入队是时读取到的tail节点不是真实的tail节点,导致入队失败。

  1. private Node enq(final Node node) {
  2. for (;;) {
  3. Node t = tail;
  4. if (t == null) { // Must initialize
  5. if (compareAndSetHead(new Node()))
  6. tail = head;
  7. } else {
  8. node.prev = t;
  9. if (compareAndSetTail(t, node)) {
  10. t.next = node;
  11. return t;
  12. }
  13. }
  14. }
  15. }

说明:
1 如果sync queue中没有要等待锁的Node(线程)时,初始化一个Node节点作为头节点和尾节点,注意是一个new Node(),什么也没有的Node。由于队列时空的,可能多个线程同时设置head节点,所以需要使用CAS保证线程安全。
2 然后在下一轮的循环中,tail已经不为null了,再通过CAS将node设置到队列的尾部,如果设置失败,继续不断的尝试,直到成功为止。注意这个过程会发生有趣的尾分裂现象。
3 在sync queue中,head节点是哑节点,不代表任何线程。why

addWaiter总结
1 addWaiter不含有关于锁的任何操作,它主要是将抢占锁失败的线程包装成Node节点,并不断尝试将此Node入队列。如果队列是空队列,则初始化一个哑节点(不代表任何线程)作为头节点,再将当前节点加在head节点的后面。
2 addWaiter返回的是当前线程包装的Node节点,也就是enq调用完后,返回的必定是sync queue的尾节点。在addWaiter返回时,当前线程包装成的Node节点已经进入了队列。

4.3 FairSync.acquireQueued(node, 1)

acquireQueued的逻辑比较复杂,需要重点吃透。在分析其源码的前面,需要再次说明几点,
1 能进入到此方法,说明当前线程争抢锁失败后,已经被包装成Node节点,并且进入sync queue了。
2 如果当前节点的前驱节点是head,则尝试再争抢锁一次。
3 检查一下,争抢锁失败后,是否需要将当前线程挂起。

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. for (;;) {
  6. final Node p = node.predecessor();
  7. if (p == head && tryAcquire(arg)) {
  8. setHead(node);
  9. p.next = null; // help GC
  10. failed = false;
  11. return interrupted;
  12. }
  13. if (shouldParkAfterFailedAcquire(p, node) &&
  14. parkAndCheckInterrupt())
  15. interrupted = true;
  16. }
  17. } finally {
  18. if (failed)
  19. cancelAcquire(node);
  20. }
  21. }

为什么会在for循环中,不断检查当前节点的前驱节点是head时,再去尝试获取锁呢?

我们知道传入的node节点是addWaiter返回的,它一定是当前队列的尾节点。代码中检测到当前节点的前驱节点是head节点(head节点是哑节点,不代表任何线程,或者说代表当前持有独占锁的线程),表示当前节点已经排在等待锁的队列的最前方了。再接着看

  1. final Node p = node.predecessor();
  2. if (p == head && tryAcquire(arg)) {
  3. setHead(node);
  4. p.next = null; // help GC
  5. failed = false;
  6. return interrupted;
  7. }
  8. private void setHead(Node node) {
  9. head = node;
  10. node.thread = null;
  11. node.prev = null;
  12. }

1 如果获取锁成功,则将当前Node节点设置成head节点(并把当前节点的thread设置成null,节点的前驱节点设置成null),为什么把当前节点的thread设置成null,因为在tryAcquire成功抢占锁时,已经将AQS.exclusiveOwnableThread设置成成功抢占锁的线程了,Node节点的thread已经没有必要再记录了,从其他角度来说,是将当前线程从等待锁的队列中拿出来了,这是一个让当前线程出队列的变相操作。
2 因为当前Node是sync queue的尾节点了,所以sync queu不可能是空队列了,也就间接表明,没有其他线程做设置队列的head操作,所以可以直接调用setHead方法来保证线程安全性(因为线程已经占有锁)。整个if语句内容是线程安全的,不需要向enq那样使用CAS保证线程安全性。

如果当前Node的前驱节点不是head节点,或者如上所说,但是争抢锁失败了呢?

  1. if (shouldParkAfterFailedAcquire(p, node) &&
  2. parkAndCheckInterrupt()){
  3. interrupted = true;
  4. }

shouldParkAfterFailedAcquire(p, node)

从函数名就可以看出,这个函数是要判断在争抢锁失败后,是否挂起当前线程。
本文开篇提到了AQS.Node结构,前面也用到了prev和next,thread,nextWaiter,就是没有用到waitStatus,注意看接下来就是waitStatus的表演。
前面提到waitStatus有4种值,分别是CANCELLEDSIGNALCONDITIONPROPAGATE,再加上初始值0,一共五种值。在独占锁场景下,只用到了CANCELLED(1),SIGNAL(-1),已经初始值0。
那么CANCELLED和SIGNAL到底代表着什么意思呢?

  1. /** waitStatus value to indicate thread has cancelled */
  2. static final int CANCELLED = 1;
  3. /** waitStatus value to indicate successor's thread needs unparking */
  4. static final int SIGNAL = -1;

通过查看文档发现,
1 CANCELLED表示当前Node节点的线程已经取消了排队,不想再去等待获取锁了。
2 SIGNAL就比较有意思了,它不是表征当前Node节点的waitStatus,而是表征当前节点的下一个节点的waitStatus。
如果一个节点的waitStatus是SIGNAL的话,那就表示它的下一个节点(或者是后继节点)已经被挂起或者(马上就要被挂起),所以当一个节点在释放锁或者取消等待锁之后,还需要做一件非常重要的事,那就是如果它的waitStatus是SIGNAL的话,它还需要唤醒它的后继节点。
另外这个SIGNAL状态,通常都不是节点自己给自己设置的,而是后继节点给其设置的,打个比方来说,

你去食堂排队就餐,你去的比较晚,前面已经排了10个人了,按照每人需要3min完成打饭来算,你可能需要等上半个小时,但是你不想无聊的干等着(去旁边看看其他窗口有啥好吃的),你就跟前面的哥们商量,能不能在快到你(release),或者你不想等待(cancle)时,麻烦通知我一下,于是你将他的状态设置成SIGNAL,去旁边溜达了。

换个角度讲,如果你决定将某个线程挂起,那么你必须要保证在队列中线程的前驱节点的状态是SIGNAL,这样相当于给线程设定了一个闹钟然后才能睡,等设定时间到了,闹钟会叫醒你,否则,你要睡到天荒地老了。
有了这些背景知识,我们再来看源码:

  1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  2. int ws = pred.waitStatus;
  3. if (ws == Node.SIGNAL)
  4. return true;
  5. if (ws > 0) {
  6. do {
  7. node.prev = pred = pred.prev;
  8. } while (pred.waitStatus > 0);
  9. pred.next = node;
  10. } else {
  11. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  12. }
  13. return false;
  14. }

说明:
1 如果当前节点的前驱节点的waitStatus是SIGNAL状态,那么可以直接关起当前线程。
2 如果前驱节点的waitStatus是CANCELLED,那么通过do-while循环,从后往前找一直找到队列前面waitStatus是SIGNAL的Node节点,并将此节点的后继节点设置成当前节点,并返回false,也就是说当前线程不能挂起。
3 如果前驱节点的waitStatus是其他状态,CONDITION和PROPAGATE,则通过CAS将前驱节点的waitStatus设置成SIGNAL。并返回false,不挂起当前线程。

为了方便查看,shouldParkAfterFailedAcquire返回false后,程序会怎么处理,我将代码重新复制粘贴如下:

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. for (;;) {
  6. final Node p = node.predecessor();
  7. if (p == head && tryAcquire(arg)) {
  8. setHead(node);
  9. p.next = null; // help GC
  10. failed = false;
  11. return interrupted;
  12. }
  13. //注意,在这个地方被调用
  14. if (shouldParkAfterFailedAcquire(p, node) &&
  15. parkAndCheckInterrupt())
  16. interrupted = true;
  17. }
  18. } finally {
  19. if (failed)
  20. cancelAcquire(node);
  21. }
  22. }

可以看到如果shouldParkAfterFailedAcquire返回false后,程序会继续回到循环中,重新判断前驱节点的状态,因为排在队列前面中的线程一旦出队列(成功抢占锁),队列的head节点就会发生变化,这样就轮到当前线程再次争抢锁了。
如果shouldParkAfterFailedAcquire返回true,后会接着执行parkAndCheckInterrupt将当前线程挂起。

parkAndCheckInterrupt

  1. private final boolean parkAndCheckInterrupt() {
  2. LockSupport.park(this);
  3. return Thread.interrupted();
  4. }

LockSupport.park(this)会将当前线程挂起,除非其他线程调用了LockSupport.unpark唤醒了当前线程,或者当前线程被中断,否则程序一直停在这里不会继续往下执行,后面的Thread.interrupted也不会执行。

五、总结

1 AQS中使用state表示锁,如果线程使用CAS成功将state由0设置成1,即表示获得锁。
2 addWriter将争抢锁失败的线程包装成Node节点,放入sync queue的尾部,是通过enq初始化等待锁队列的,并创建一个哑节点作为head。
3 acquireQueued比较上一步入队列的节点的前驱节点是否是head,如果是,则再次尝试抢占锁;或者将当前线程挂起。
4 shouldParkAfterFailedAcquire保证当前节点的前驱节点waitStatus是SIGNAL状态,从而保证自己挂起后,前驱节点会在适当的时机唤醒自己。

参考:
1 逐行分析AQS源码—独占锁的获取,https://segmentfault.com/a/1190000015739343。
2 Java并发之学习ReentrantLock,https://cloud.tencent.com/developer/article/1038499。
2 不可不说的Java锁事,https://tech.meituan.com/2018/11/15/java-lock.html。