上锁

点击查看【processon】

  • 里面蓝色的方块表明要铺开分析的部分;虚线的菱形方块表示模板方法,其指向的方块是具体实现

整体将划分为:

  1. 进入 acquire() ,分析 acquire() 的源码
  2. 学习 tryAcquire() 的具体实现,看看 ReentrantLock 如何实现的 tryAcquire()
  3. 学习 addWaiter()
  4. 学习 acquireQueued()

下面就直奔主题:

  1. acquire() 属于 public final 级方法,它规划了获取锁的整体流程。它首先调用了模板方法 tryAcquire() 来尝试获取锁,如果获取锁失败,则进入等待队列;如果获取成功,就直接返回

    1. // AbstractQueuedSynchronizer#acquire
    2. public final void acquire(int arg) {
    3. // 尝试acquire
    4. // 如果tryAcquire失败了,则将当前线程封装为节点入队
    5. if (!tryAcquire(arg) &&
    6. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    7. selfInterrupt();
    8. }
  2. 下面看看 tryAcquire() 的具体实现,我们当前以 ReentrantLock 为例:

    1. // ReentrantLock.NonfairSync#tryAcquire
    2. protected final boolean tryAcquire(int acquires) {
    3. return nonfairTryAcquire(acquires);
    4. }
    5. // ReentrantLock.Sync#nonfairTryAcquire
    6. final boolean nonfairTryAcquire(int acquires) {
    7. final Thread current = Thread.currentThread(); // 获取当前线程
    8. int c = getState(); // 获取当前同步器的同步状态
    9. if (c == 0) { // state == 0 表示没有人持有锁
    10. if (compareAndSetState(0, acquires)) { // 尝试获取锁
    11. setExclusiveOwnerThread(current); // 如果获取成功,则将自己设置为锁持有者
    12. return true;
    13. }
    14. }
    15. else if (current == getExclusiveOwnerThread()) { // 如果持有锁的线程就是自己本身(重入锁)
    16. int nextc = c + acquires; // 增加次数
    17. if (nextc < 0) // 溢出
    18. throw new Error("Maximum lock count exceeded");
    19. setState(nextc);
    20. return true;
    21. }
    22. return false; // 获取失败进入等待
    23. }

    | 在 tryAcquire() 的实现里面,我们可以看到两点:
    - 虽然等待队列是 FIFO 的,但是 acquire() 先执行 tryAcquire() 会让新来的线程有机会直接获取到锁(在释放锁的瞬间,新线程直接 tryAcquire() 。这是非公平锁的原理。
    - 进入 tryAcquire() 方法后,发现持有锁的就是自己本身,那么就对 **state** 进行递增释放锁时也逐个递减。这就是锁可重入的原理。
    |

    小北-老师.jpeg | | —- | —- | | | |

  3. tryAcquire() 失败时,该线程就要准备进入等待队列—— addWaiter()

    1. // AbstractQueuedSynchronizer#addWaiter()
    2. private Node addWaiter(Node mode) {
    3. // 将当前线程封装为Node
    4. Node node = new Node(Thread.currentThread(), mode);
    5. // 这部分是简单版的入队操作,主要原因是为了提高效率;否则进入enq(),就是完整的入队操作
    6. Node pred = tail;
    7. if (pred != null) { // TAIL不为空
    8. node.prev = pred; // 将node的prev属性设置为tail(链表式队列的操作)
    9. if (compareAndSetTail(pred, node)) { // 尝试CAS将node作为TAIL
    10. pred.next = node; // 如果CAS成功,那么直接设置TAIL的next属性
    11. return node;
    12. }
    13. }
    14. enq(node); // 进入完整的入队操作
    15. return node;
    16. }
    17. ————————————————————————————————————————————————————————————————————————————————
    18. // AbstractQueuedSynchronizer#enq()
    19. private Node enq(final Node node) {
    20. for (;;) { // 经典的死循环
    21. Node t = tail; // 流程和简化版入队很像,就是多了一步初始化
    22. if (t == null) { // 如果TAIL为null,说明还没有初始化
    23. if (compareAndSetHead(new Node())) // 随便搞个Node对象,先初始化队列
    24. tail = head;
    25. } else {
    26. node.prev = t; // 将node的prev属性设置为TAIL(和前面一样的)
    27. if (compareAndSetTail(t, node)) { // 后续的逻辑和简化版一样
    28. t.next = node;
    29. return t;
    30. }
    31. }
    32. }
    33. }

    | 这里有两个细节要讨论一下:
    1. 区分简化版和完整版的入队操作。针对已经完成初始化的同步器,在没有大量线程竞争的情况,通过简化版入队少了几步操作,提高了速度
    1. 不知道各位小伙伴有没有想过除了 CAS 以外,其他的操作会不会涉及并发问题?这里给个思路,并发问题都是因为同个资源上的共享读写造成的对于一个等待队列来说,大家竞争的都是 **HEAD/TAIL** 。至于每个节点的属性,只要设置 HEAD/TAIL 成功就没问题啦~
    |

    小北-老师.jpeg | | —- | —- | | | |

  4. 进入 acquireQueued 的节点都会一直请求获取锁,直到被取消或者被中断

    1. // AbstractQueuedSynchronizer#acquireQueued
    2. final boolean acquireQueued(final Node node, int arg) {
    3. boolean failed = true; // 悲观判断
    4. try {
    5. // 记录是否被interrupt过
    6. boolean interrupted = false;
    7. for (;;) {
    8. // !!!!分析片段0 !!!!
    9. // 获取当前节点的前驱节点
    10. final Node p = node.predecessor();
    11. if (p == head && tryAcquire(arg)) { // 如果前驱节点是HEAD,尝试acquire
    12. setHead(node); // 将node设置为HEAD
    13. p.next = null; // 将原来的HEAD,即p的next属性清空。防止node无法GC
    14. failed = false; // failed设为false
    15. return interrupted;
    16. }
    17. // 如果自己的前驱节点并不是HEAD
    18. // 设置状态并准备进入阻塞
    19. if (shouldParkAfterFailedAcquire(p, node) &&
    20. parkAndCheckInterrupt())
    21. interrupted = true;
    22. }
    23. } finally {
    24. // !!!!分析片段1 !!!!
    25. // 最终判断
    26. if (failed)
    27. // 取消accquire
    28. cancelAcquire(node);
    29. }
    30. }

    接下来看看 shouldParkAfterFailedAcquire() ,这部分涉及到 NodewaitStatus

    1. // AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire()
    2. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    3. int ws = pred.waitStatus; // 获取前驱节点的waitStatus
    4. if (ws == Node.SIGNAL) // 如果前驱节点是SIGNAL,那么自己(node)就进入阻塞
    5. return true;
    6. if (ws > 0) { // 如果前驱节点已经被取消了,将头尾没有取消的节点连接起来
    7. // 然后再重新判断一波
    8. // !!!!分析片段2 !!!!
    9. do {
    10. node.prev = pred = pred.prev;
    11. } while (pred.waitStatus > 0);
    12. pred.next = node;
    13. } else {
    14. // 当前驱节点的waitStatus为0或者PROPAGATE时,设置前驱节点为SIGNAL并重新尝试
    15. // 一开始所有的节点的waitStatus都为0,所以都会进入else块
    16. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    17. }
    18. return false;
    19. }

    该方法都是针对前驱节点的状态进行操作的,所以可以得到这么一个规律:
    点击查看【processon】
    只要有新节点入队,新节点的前驱节点如果不是 SIGNAL ,就将前驱节点设置为 SIGNAL 并让自己进入阻塞状态。

当当当~又是我~第四阶段的代码比较长,但是逻辑还是简单的。我挑了三个感觉有坑的片段进行分析:
- 分析片段0:为什么要通过前驱节点来判断?这是什么意思
- 分析片段1:这部分涉及取消上锁的逻辑,后面讲
- 分析片段2:这里不会有并发问题吗?这部分处理的范围主要在当前节点之前的所有空节点,遇到非空节点就停下来;又因为前面(除HEAD)都处于阻塞状态,每个节点都只处理它前面的一个节点,所以不必担心有并发问题


小北-老师.jpeg

总结一下, AQS 上锁时如果没有线程竞争,就不会初始化同步队列(说明同步队列是懒加载的);如果发生了线程竞争,没有抢到锁的剩余线程会被封装为 Node 并被连接起来,串成一个链式双向队列,此时等待中的线程都会处于阻塞状态,并等待自己的前驱节点唤醒自己。

释放锁

本节内容比较简单,主要就是涉及 waitStatus 的变化以及 HEAD 节点和 HEAD 后继节点的变化:
点击查看【processon】

  1. 当A线程调用了 ReentrantLock.release() (其本质就是 AQS#release() )

    public final boolean release(int arg) {
     // 调用子类的实现
     if (tryRelease(arg)) {
         // 获取头节点,此时的头结点只有next属性仍然存在,thread、prev属性均被清除
         // 详细看setHead()方法
         Node h = head;
         // 如果HEAD部位空,而且waitStatus不为0
         if (h != null && h.waitStatus != 0)
             unparkSuccessor(h); // 解除后续节点的阻塞
         return true;
     }
     return false;
    }
    
  2. 接下来看看 tryRelease() ,这个方法是由 ReentrantLock 实现的:

    protected final boolean tryRelease(int releases) {
     // 计算当前线程持有锁的个数
     int c = getState() - releases;
     // 防止未持有锁的线程瞎搞
     if (Thread.currentThread() != getExclusiveOwnerThread())
         throw new IllegalMonitorStateException();
     boolean free = false;
     if (c == 0) {
         free = true;
         setExclusiveOwnerThread(null);
     }
     setState(c);
     return free;
    }
    

    tryRelease() 的关键点如下所示:

  • tryRelease() 传入的参数:如果 state 减去 releases 参数不为0,依然无法释放锁。这是因为 ReentrantLock 是可重入锁,要把所有的锁释放了才能真正释放锁
  1. 最后看看 unparkSuccessor() ,该方法的主要作用就是为了唤醒后继节点:
    private void unparkSuccessor(Node node) {
     int ws = node.waitStatus; // 获取HEAD
     if (ws < 0)
         compareAndSetWaitStatus(node, ws, 0); // 将HEAD的waitStatus设置为0
     /*
      * 获取头结点后面的结点s
      * 如果结点s是null或者s已经被取消了
      * 就从后往前遍历,直到找到一个满足条件的结点??
      */
     Node s = node.next;
     if (s == null || s.waitStatus > 0) {
         s = null;
         for (Node t = tail; t != null && t != node; t = t.prev)
             // 查找最靠近HEAD的 且 waitStatus<=0 的节点
             if (t.waitStatus <= 0)
                 s = t;
     }
     // 如果s != null,则唤醒s
     if (s != null)
         LockSupport.unpark(s.thread);
    }
    
    简单来说就是如果后继节点被取消了或者为 null ,就去找离自己最近的可唤醒的节点,找到后唤醒;被唤醒的节点在 acquireQueued() 方法里再次尝试获取锁。

取消/中断

本节介绍方法 cancelAcquire() ,该方法都出现在 acquireXXX() 最后的 finally 块中,而且需要 failed 属性为 true (在获取锁的过程中被打断了),才会进入 cancelAcquire() 。而可以取消的结点,要么是定时器到时间了,要么是线程被中断了:
cancelAcquire1.png

/**
 * 10s过后,假设A线程仍持有锁,而B线程的tryLock()到期,那么B线程因为
 * throw new InterruptedException()而跳出for循环进入finally块,准备执行
 * cancelAcquire()
 * PS:此情况下C尚未到期,因为方便写:)
 */
private void cancelAcquire(Node node) {
    if (node == null)
        return;
    node.thread = null;
    // 获取当前结点的前驱结点
    Node pred = node.prev;
    /* 判断前驱结点是否为取消状态;
     * 如果是取消状态的话,就一直向前查找,直到找到非CANCELLED状态的结点
     */
    // 本示例里pred是A结点
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    Node predNext = pred.next;
    /* 不管三七二十一,来了这个方法,这个结点就注定要设置成CANCELLED
     * 那会不会有其他线程同时对这个结点的状态进行操作?
     * 有也没关系,它的顺序无论在其他CAS写操作之前还是之后最终都会设置为
     */
    // ①到这一步,本示例里B结点的ws转为CANCELLED 
    node.waitStatus = Node.CANCELLED;
    // 如果当前结点是尾结点,那么就将predNext(从后到前第一个非CANCELLED状态的结点)设置为尾结点
    // 本示例里B结点不是尾结点,跳过if进入else
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null); 
    } else {
        int ws;
        // 如果pred不是头结点,就设置状态为SIGNAL
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {  
            Node next = node.next;
            /* 当前结点是否有后继结点
             * 若有后继结点且不是CANCELLED状态,则将后继结点和pred相连
             */
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        /* 
         * 如果pred是头结点就唤醒后继的非CANCELLED结点
         */
        } else {
            // ② 唤醒后面的非CANCELLED状态的结点
            unparkSuccessor(node);
        }
        /* 很骚的操作,一般都是设置null来help gc,这里采用循环引用当新一波的  
         * tryAcquire()之后,会将CANCELLED的引用清理掉,所以最后就变成不可达的
         * 引用,自然就被gc了
         */
        node.next = node; 
    }
}

流程图大致是这样的:
cancelAcquire2.png
到这里,整个 cancelAcquire() 的流程结束了,最后的 unparkSuccessor() 方法唤醒C,具体的可以参考前面的 release() 一节。

其他上锁操作

AQS 不仅提供了常规的 acquire() 还提供了支持中断acquireInterruptibly()支持计时tryAcquireNanos 。使用这些方法进入等待队列的节点(线程)在被中断/超时后会被取消,即 cancelAcquire()

可中断的上锁 - acquireInterruptibly

acquireInterruptibly() 方法支持用户去中断,它的内部逻辑比较简单:首先尝试一下 tryAcquire() ,如果没有竞争,那获取锁成功;如果失败了,则进入等待队列。

// AbstractQueuedSynchronizer#acquireInterruptibly()
public final void acquireInterruptibly(int arg)
            throws InterruptedException {  // 注意!他之类把 InterruptedException 抛出来了
        if (Thread.interrupted()) // 如果检测到了当前线程已经被中断,就直接抛出
            throw new InterruptedException();
        if (!tryAcquire(arg))    // 尝试tryAcquire()
            doAcquireInterruptibly(arg);  // 可中断的acqurie()
    }

如果 tryAcquire() 失败了,这个线程就要准备好进入等待队列,即调用 doAcquireInterruptibly() 。该方法的逻辑 acquireQueued() 基本一致,主要区别就是被中断后抛出了中断异常:

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            // 处理状态 && 阻塞并检查中断
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException(); // 抛出异常
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

可超时的上锁 - tryAcquireNanos

这部分逻辑和 acquire() 有异曲同工之妙,基本逻辑都差不多:

  1. 首先尝试上锁—— tryAcquire() ,如果成功了就不用创建等待队列了;若发生了竞争且竞争失败了就进入 doAcquireNanos()
  2. doAcquireNanos() 仅是在 acquireQueued() 基础上增加了计时功能

首先看 tryAcquireNanos() :先检查中断状态,如果已经中断了直接抛出异常;然后尝试tryAcquire() ,如果 tryAcquire() 失败了,就进入等待队列

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

随后看 doAcquireNanos() ,该方法在 acquireQueued() 基础上加了时限。唯一注意的点就是这里用到了自旋,在时间没超过 AQS.spinForTimeoutThreshold 之前,一直处于 for 循环(其实也就1000纳秒的时间处于自旋)。当超过自旋时间,进入 LockSupport.parkNanos() ,故名思意,就是带计时的 park() ~

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L) // 设置的时间小于0,直接返回
        return false;
    final long deadline = System.nanoTime() + nanosTimeout; // 超时时限
    final Node node = addWaiter(Node.EXCLUSIVE); // 创建一个新的结点,注意EXCLUSIVE
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) // 如果当前时间超过了时限,就当获取锁失败,返回false
                return false;
            // 如果剩余超时的时间小于 spinForTimeoutThreshold 就没必要阻塞,直接自选
            // 如果剩余超时的时间大于 spinForTimeoutThreshold 就进入阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout); // 带计时功能的park
            if (Thread.interrupted()) // 发现已经中断了,就抛出异常
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
总结一下“上锁操作”,“上锁操作”在独占模式下分为三种面对开发人员的API: acquire()acquireInterruptibly()tryAcquireNanos() ,它们分别支持了普通上锁、可中断上锁、超时上锁,主要的流程都是先尝试 tryAcquire() ,如果获取锁失败才考虑进入等待队列。

小北-老师.jpeg

公平锁实现原理

前面讲过了“非公平锁”的实现原理,我们可以知道这样一个流程:
lock() -> AQS#tryAcquire() -> 具体实现
而“公平”、“非公平”就是在这个“具体实现”里完成的。对于“非公平”来说,只需要正常acquire()即可,就算发生了抢占也没事;对于“公平”来说,如果等待队列里有其他等待线程,则需要排队。由于整个队列本身就是FIFO的,所以只需要保证tryAcquire()时不要发生插队即可,那么事实上也确实是这么做的:

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

如果发现这个等待队列里还有其他元素,则返回true;此时外部的判断:

static final class FairSync extends Sync {
    ...
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 如果队列中有其他元素,则 !hasQueuedPredecessors() == false
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        ...
        return false;
    }
}