1. 简介

JDK1.5 之前,控制线程同步的方式是使用 synchronized 关键字,可作用于函数声明或函数实现内。synchronized 完全依赖 JVM 的实现,且性能低下。从 JDK1.5 引入了 ReentrantLock 之后,Java 处理并发的能力得到了较大的提高,而在 JDK1.6,synchronized 方式的锁机制也默认设置为自旋锁从而提高了并发方面的性能。

即使 synchronized 同步机制性能得到了提高,但依然建议使用 ReentrantLock 实现多线程之间的同步控制,因为 ReentrantLock 具备更多程序可控的选项,如公平锁非公平锁,或中断线程竞争锁,这些都是 synchronized 无法提供,至少至今并未提供相关方法控制。

举一个真实的例子,背景是公司产品有一个功能集成了微信,并将程序部署到 Tomcat webapps 目录下,该程序引用了一个 github 上的开源类库,而部署了该程序的客户环境偶然出现 Tomcat 假死的情况。追溯到源码,代码片段如下

  1. public String getAccessToken(boolean forceRefresh) throws WxErrorException {
  2. ... ...
  3. synchronized (globalAccessTokenRefreshLock) {
  4. String url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?...";
  5. HttpGet httpGet = new HttpGet(url);
  6. ... ...
  7. }
  8. return accessToken;
  9. }

首先感谢作者为开源付出,而上述代码存在一个问题,就是 HttpGet 请求不设置 ConnectTimeout 和ReadTimeout 使得出现网络问题后所有线程将会阻塞在这个 getAccessToken 方法上,导致 Tomcat 线程池里的可用线程被耗尽造成 Tomcat 假死的现象。后来编写了一个适配器,重写这个 getAccessToken 方法,改为使用 ReentrantLock 的 tryLock(long, TimeUnit) 即在有限时间内如果不能成功竞争锁则返回原来的 AceessToken 解决。

代码如下,注意 timeout 的时间单位为纳秒,因为面对高速运算的 CPU,纳秒级别的超时时间才能控制得更加准确

  public String getAccessToken(boolean forceRefresh) throws WxErrorException {
    ... ...
    if (lock.tryLock() || lock.tryLock(timeout, TimeUnit.NANOSECONDS)) {
      try {
        String url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?...";
        HttpGet httpGet = new HttpGet(url);
        // 注意 HttpGet 设置了 ConnectTimeout 和 ReadTimeout
        // ... ...
        return newAccessToken;
      } finally {
        lock.unlock();
      }
    }
    return originalAccessToken;
  }

2. 类型

重入锁,英文名为 ReentrantLock,它是一种自旋锁(spinlocks),可以划分为公平锁非公平锁两类,顾名思义,公平锁就是按照线程的请求顺序执行,而非公平锁则是允许新线程请求时竞争锁。

公平锁只能够保证线程执行的先后顺序,并不能保证线程调度的先后顺序,也就是说,调度线程对锁的访问请求因系统或程序原因具有非常大的不确定性。但公平锁能够减少锁的竞争,并保证每一个线程竞争获取锁的时间相近,且不会造成“饥饿”线程(即阻塞时间较长)。

3. 使用

注意在使用上 lock.lock() 与 try{}finally{} 的语序问题,lock.lock() 的调用必须保证在 try{}finally{} 之前。假设我们将 lock.lock() 的调用放到 try{}finally{} 逻辑体中,如果 lock.lock() 竞争锁失败后,却依然尝试调用 lock.unlock() 方法,将会抛出 IllegalMonitorStateException 异常,源码分析章节部分再作分析。下面将举例公平锁和非公平锁的使用例子。

3.1. 公平锁

class X {
   private final ReentrantLock lock = new ReentrantLock(true); // true 表示公平锁
   // ... ...
   public void doFunc() {
     lock.lock();  // 在获取锁之前一直阻塞
     try {
       // ... 获取锁后的函数体
     } finally {
       lock.unlock();
     }
   }
 }

3.2. 非公平锁

class X {
   private final ReentrantLock lock = new ReentrantLock();
   // ... ...
   public void doFunc() {
     lock.lock();  // 在获取锁之前一直阻塞
     try {
       // ... 获取锁后的函数体
     } finally {
       lock.unlock();
     }
   }
 }

使用重入锁其实都非常简单,公平锁和非公平锁在使用上的主要区别在于构造方法,如果构造参数中传入了 true 则表示公平锁,默认情况下是 false 即非公平锁,以下是 ReentrantLock 的构造方法的源码,不需要过多的注释就能直接看懂(fair 的意思是公平,unfair 的意思是不公平)。值得一提的是,因为公平锁更加损耗系统资源,并降低服务的吞吐量(display lower overall throughput),所以非公平锁是重入锁无参构造函数的默认实现方式。

    /**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

4. 数据结构

重入锁的实现是基于双向链表,其数据结构并不是 Doug Lea(重入锁作者)一意孤行的决定,而是参考了 “CLH” (Craig, Landin, and Hagersten) Queue Lock 上的队列锁实现,尽管 Java 重入锁的实现已经与这里的描述存在较大差别,但不阻碍 CLH Queue Lock 发挥指导性作用。如下图是重入锁的具体数据结构表现形式。

ReentrantLock.png

虽然是常见的双向链表,但其实包含了一些线程概念,每一个节点 Node 都包含一条线程,具体解析如下

  1. Running Node 表示该节点的线程已获取锁且正在运行
  2. Parked Node 表示该节点的线程因获取不了锁而被阻塞
  3. Head 指向了双向队列的头节点,该节点才能可以释放锁且通知下一个正常状态的节点,即 head.next 节点
  4. Tail 指向双向队列的尾节点,若新请求的线程无法通过竞争获取锁,则追加到双向队列的末端并使得 Tail 指针指向这个新节点。注意,Tail 节点设置也存在并发竞争的情况,这是为什么公平锁更加损耗性能的原因

着重说明 Parked Node 节点,在该节点上的线程均处于阻塞状态,到底是什么外力让线程阻塞?这是由于该节点上的线程尝试获取该锁,但多次竞争锁失败后自行调用系统方法阻塞。

5. 实现原理

接下来将会剖析重入锁的核心实现原理,其实理解重入锁的原理并不困难,但单纯的通过代码了解其实现原理还是存在一定的难度,这里将尽量的使用图解的方式进行说明。希望重入锁的实现原理能够对你有所启发,在其他项目中能够借鉴。

图解中需要用到 unparkpark 操作,对应的解释如下

操作 类库 备注
unpark java.util.concurrent.locks.LockSupport.unpark() 唤醒阻塞中的线程,或使得下一次调用 park 方法尝试阻塞线程失效
park java.util.concurrent.locks.LockSupport.park() 阻塞该线程,如果调用 park 方法前调用了 unpark 方法,将不会阻塞此线程

5.1. 持有锁

在数据结构分析中已经提到,持有锁的线程处于双向链表的 Head 节点,且该节点上的线程处于运行状态,并负责唤醒后继非取消状态的节点,如下图示

image.png

5.2. 释放锁

当前运行的 Head 节点线程释放锁后,将唤醒 Head 的后继节点,注意该后继节点必须处于正常状态(非取消状态)。Head 节点线程释放锁后 _unpark _后续正常状态的节点方法,能够唤醒使用 park 后处于阻塞状态的线程,锁的释放和唤醒过程,如下图示

image.png

5.3. 锁竞争

清楚了持有锁和释放锁的过程,再看如何实现锁的竞争,为了简化锁的竞争,这里假设只有一条线程请求,注意实际上请求的线程数都多于一条。

5.3.1. 公平锁

公平锁的存在就是为了减少线程对锁的竞争,每一条请求的线程都必须保证公平,因此每一条请求的线程都必须追加到双向链表的结尾,但存在 Tail 指针的竞争。如下图示,Barging Thread 不对锁产生竞争,而是直接追加到双向链表的结尾位置。

image.png

公平锁虽然不对锁进行竞争,但需要对设置 Tail 节点发生竞争,特别需要注意的是 Barging Thread 追加到 Tail 是一个死循环的过程,也就是说并发大的情况下存在大量的 Barging Thread 对设置 Tail 进行竞争,这些非阻塞状态的线程必然增加 CPU 对线程上下文切换次数,从而降低并发处理的性能。

5.3.2. 非公平锁竞争

相对于公平锁竞争,非公平锁的竞争就相对要复杂一些,需要分两种情况讨论,分别是新请求的线程竞争锁成功及新请求的线程竞争锁失败两种情况。

竞争锁成功的过程,如下图是 Barging Thread 与 Head 后继节点竞争的过程

image.png

Barging Tread 成功获取锁后就必须唤醒 Head 的后继节点,但有可能出现以下这种情况,当 Barging _Thread 请求到来,且 Head 后继节点刚好被唤醒,这时 Barging Thread 获取了锁权限,上图就是这类场景,而 Barging Thread 通过 LockSupport.unpark _企图唤醒 Head 后继节点的时候,Head 后继节点并未调用 LockSupport.park 操作阻塞当前节点上线程,这就可能出现双链表中的所有节点上的线程都处于阻塞状态,如下图所示,Barging Thread 运行完毕并释放锁的时候,Head 的下一个节点还处于 Running 状态

image.png

为了解决链表上的所有节点都处于阻塞状态,Java 中的 unpark 与 park 操作可无视先后顺序,也就是说先 park 再 unpark 能够正常唤醒线程,而先进行 unpark,下一次 park 操作将不会阻塞线程,意思是对线程执行了 unpark 操作之后,后续的一个 park 操作将失效(阻塞无效)。

以下引用 java.util.concurrent.locks.LockSupport.unpark() 操作的 Java 注释

Makes available the permit for the given thread, if it was not already available. If the thread was blocked on park then it will unblock. Otherwise, its next call to park is guaranteed not to block. This operation is not guaranteed to have any effect at all if the given thread has not been started.

重点是 “Otherwise, its next call to park is guaranteed not to block.” 这一句,大概的意思是“否则,保证下一次调用 park 操作不会阻塞”,注意这里的 park 是指 LockSupport.park() 方法调用。如下图所示
image.png

竞争锁失败的过程,Barging Thread 竞争锁失败之后的过程与公平锁处理过程一致,都需要将线程追加到双向列表结尾并自行阻塞,如下图示

image.png

6. 源码分析

现在我们基于 JDK 1.8,对源码进行分析,如果分析每一行代码,这里的篇幅将会很长,所以这里挑选了最核心的代码作为分析的对象。其实我们也没有必要分析每一行的代码,这是因为 JDK 的代码一直在迭代变化,而只有核心的代码不会产生较大变化,如果发生了重构,那么这篇《重入锁 | ReentrantLock》技术分享也就相当于过时了。

6.1. 锁竞争

我们通过分析非公平锁的竞争源码,不对公平锁的竞争源码进行分析,因为公平锁根本不对锁发生竞争关系。先来看非公平锁继承的静态抽象类 ReentrantLock.Sync, 该对象是一个继承 AbstractQueuedSynchronizer (简称 AQS )的抽象类。

abstract static class Sync extends AbstractQueuedSynchronizer {
    abstract void lock();
    // ... ...
}

AQS 是一个同步锁的核心实现模块,相当于一个抽象骨架实现类(abstract skeletal implementation class),更多的,人们称它为模板设计模式中的模板类,不管哪种称呼都是为了帮助子类实现大部分的同步逻辑,减轻子类的实现负担。如下代码所示,Sync 只要保留 lock() 抽象方法留给 FairSync(公平锁) 和 NonfairSync(非公平锁)单独实现即可。如下就是 NonfaireSync 的 lock() 实现源码。

static final class NonfairSync extends Sync {
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
}

NonfairSync 的 lock() 实现逻辑就是一开始就尝试竞争锁,失败后进入 AQS 的 acquire(1) 方法,acquire(1) 其实现源码如下

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
}

其中 tryAcquire(int) 由公平锁和非公平锁各自的实现,如果 tryAcquire(arg) 竞争锁失败后将通过 addWaiter(Node.EXCLUSIVE), arg) 追加到队列的结尾,然后再通过 acquireQueued(Node, arg) 尝试竞争锁,如果三次竞争锁失败后则调用 LockSupport.park 阻塞当前节点上的请求线程。

再看 tryAcquire(int) 的源码实现,简单粗暴的先判断当前锁状态是否被占用(getState() 返回 0 表示未被占用),如未被占用则通过 compareAndSetState(0, acquires) 尝试竞争锁;如被占用则比较当前线程与占用锁的线程是否相同,一般的,这种比较是判断是否重入竞争锁的检查,这也是重入锁(ReentrantLock)的来由。

static final class NonfairSync extends Sync {
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) { // 尝试竞争锁
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) { // 判断是否重入
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

tryAcquire(arg) 竞争锁失败后,则将竞争锁的线程利用一个节点(Node)包装起来并追加到双向列表的尾部,对应的实现源码就是如下的 addWaiter(Node.EXCLUSIVE) 方法。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    // ... ...
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
}

compareAndSetTail(pred, node) 就是线程竞争设置 AQS Tail 指针的地方,如果这里并没有成功地将新节点设置为 Tail,则进入 enq(node) 方法,这是一个死循环,直到新节点成功设置到 Tail。

对于循环竞争设置 Tail 变量,就是重复调用 compareAndSetTail(t, node) 直到将 Tail 指向当前节点为止的过程。但 _t == null_ 的条件判读又是怎么回事?代码的意图是,当 Tail 为空的时候同时设置 Head 节点并将 Tail 指向 Head 节点,这个时候 Head 和 Tail 指向的是同一个节点。

那为什么一定要设置 Head 空节点呢?我们可以回到 实现原理 5.2. 释放锁 的“当前运行的 Head 节点线程释放锁后,将唤醒 Head 的后继节点” 那句分析。即只有 Head 的后继节点才能被唤醒并获得竞争锁的资格,所以 Head 节点,即使为空节点也是必不可少的。

通过调用 addWaiter(Node) 成功将当前节点设置为 Tail 节点之后,最后来到最复杂的部分,就是 acquireQueued(Node, int) 的方法,简单的概括就是,Head 后续节点尝试竞争锁,非 Head 后续节点的线程则进入阻塞状态,然后 Head 后续节点的线程被唤醒再尝试竞争锁,重复这个过程,直到最后竞争锁成功并将当前节点设置为 Head。对应的 shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() 就是为了判断是否需要阻塞当前请求线程。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    // ... ...
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
}

6.2. 锁释放

比起锁竞争,锁释放的源码就相对比较简单了,通过 tryRelease(arg) 释放锁,简单的将 AQS 的 state 设置为 0 即可完成锁的释放。完成锁的释放后,随即调用 unparkSuccessor(h) 唤醒 Head 后继节点。

public class ReentrantLock implements Lock, java.io.Serializable {
    // ... ...
    public void unlock() {
        sync.release(1);
    }
}
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    // ... ...
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
}
abstract static class Sync extends AbstractQueuedSynchronizer {
    // ... ...
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
}

注意锁释放源码中以下代码片段,回应 3. 使用lock.lock() 的调用必须保证在 try{} finally{} 之前” 的原因,如果当前线程未获取锁但调用 lock.unlock() 方法时将会产生 IllegalMonitorStateException 异常。

    if (Thread.currentThread() != getExclusiveOwnerThread())
       throw new IllegalMonitorStateException();

小结

至此,重入锁的机制原理已经介绍完毕,这里只介绍核心部分,对于 java.util.concurrent.locks.ReentrantLock 还有不同的锁竞争机制,如 1. 简介 中使用到的 tryLock(long timeout, TimeUnit unit) 避免无限等待其次还有 lockInterruptibly() 可被打断,避免永久阻塞。

最后值得一提的是,synchronized 同步修饰符已经不适合于 JDK 1.5 后使用了,更多的是,我们应该选择使用 ReentrantLock(重入锁) / ReentrantReadWriteLock(共享重入锁)作为首选方案。如果你对并发编程非常熟悉,也可通过 CAS(compare and swap)的方式自行实现同步处理,或者继承 AQS 实现定制的同步锁。

参考

The java.util.concurrent Synchronizer Framework(AQS) Paper
JSR-166: Concurrency Utilities
“CLH” (Craig, Landin, and Hagersten) Queue Lock Paper