概述
ReentrantLock
默认使用非公平锁,不过,你也可以通过构造方法传入 true 构建基于公平锁的 ReentrantLock
。
内部类
由于 ReentrantLock
具有公平锁和非公平锁,所以会有两个内部类,分别是 FairSync 对应公平锁,NonfairSync 对应非公平锁。
FairSync
// java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire
/**
* 在公平模式下,尝试获取一下锁。如果获取锁成功,返回true,否则返回false。
* 以下情况会返回true:① 没有其它线程竞争锁。②当前线程重入。
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// #1 获取资源的的状态
int c = getState();
// #2 c==0,意味着没有线程持有锁
if (c == 0) {
// #3 虽然此刻锁是可用的,但是需要讲究FIFO,
// 查看队列是否有线程存在排队,true表示已经有其它线程需要锁,直接 return false
// 如果返回false,说明此刻符合获取锁的资格,记住,是此刻
if (!hasQueuedPredecessors() &&
// #4 既然有了资格,那么就尝试一个吧,CAS更新state
// 如果更新失败,直接返回false,说明本轮与其它线程竞争修改落败,重新来过
compareAndSetState(0, acquires)) {
// #5 修改state成功,将当前线程设置为独占
setExclusiveOwnerThread(current);
// #6 返回 true 表示本次尝试获取锁成功
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// #7 进到这个分支,说明是线程重入,直接更新 state 即可
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// #8 本次尝试失败,重新来过
return false;
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#hasQueuedPredecessors
/**
* true:h != t,说明队列中有线程,再判断 head.next 是否为空,如果不为空,直接返回 true,如果不为空
* 再判断首个线程是否为当前线程(重入)
* false:h == t, 队列为空,直接返回
*/
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
NonfairSync
非公平锁,首先直接去抢占一次。抢占失败,才调用 acquire(1)
。在这个方法中,如果发现 state = 0
,那么也直接抢占。
// java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
// 这里会先去抢占一次,如果失败,才执行 acquire(1)
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
/**
* 在非公平模式下,尝试获得锁
*
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// #1 c==0,意味着此刻没有线程占有锁资源
if (c == 0) {
// #2 二话不说,直接尝试修改 state 的值
// 哪管队列是否有其它排队的线程,自己先抢占了
if (compareAndSetState(0, acquires)) {
// #3 修改成功,返回true。太蛮横了,但是效率高呀,不需要判断队列情况
setExclusiveOwnerThread(current);
return true;
}
}
// #4 重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// #5 本轮抢不到锁,直接返回false
return false;
}
Condition
Condition 表示条件,比如当某个条件满足后就可以唤醒线程继续执行代码,也可以当某个条件满足后阻塞线程。Condition 通常用于生产者-消费者场景。比如
/**
* 生产者-消费者线程
*/
public class TestCondition {
static int SIZE = 10;
static int count = 0;
public static void main(String[] args) throws Exception {
final ReentrantLock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final List<Integer> dataList = new ArrayList<>(SIZE);
Thread producer = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
lock.lock();
// 生产者生产效率高
while (dataList.size() == SIZE) {
// 满了
try {
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 生产数据
dataList.add(count++);
// 唤醒生产者
notEmpty.signal();
} finally {
lock.unlock();
}
}
});
Thread consumer = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
lock.lock();
// 生产者生产效率高
while (dataList.size() == 0) {
// 队列中没有数据
try {
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 消费数据
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者消费较慢,消费数据:" + dataList.remove(0));
notFull.signal();
} finally {
lock.unlock();
}
}
});
producer.start();
consumer.start();
consumer.join();
}
}
输出如下:
消费者消费较慢,消费数据:0
消费者消费较慢,消费数据:1
消费者消费较慢,消费数据:2
消费者消费较慢,消费数据:3
消费者消费较慢,消费数据:4
消费者消费较慢,消费数据:5
消费者消费较慢,消费数据:6
消费者消费较慢,消费数据:7
消费者消费较慢,消费数据:8
...
使用 Condition 时,必须持有锁。lock.lock()
。这个和 Object 类中的方法具有相似的语义,需要先持有某个对象的 Monitor 锁才能执行 wait()、notify() 或 notifyAll() 方法。
可多次从 ReentrantLock
实例中获取多个 ConditionObject 实例。
ConditionObject
它是 Condition 接口的实现类,内部核心结构是一个双向链表:
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
//...
}
在介绍 AQS 的时候,它的内部也有一个双向链表,用来存放尝试获取锁而阻塞的线程。而 ConditionObject 内部也是使用一个单向链表存放等待某个条件成立的阻塞的线程的单向链表,称为条件队列。
// java.util.concurrent.locks.AbstractQueuedSynchronizer.Node
Node nextWaiter;
还记得 Node 节点的内部有一个 nextWaiter 引用么,这个就是实现单向链表的条件队列的基础。
await()
- 将当前线程包装为Node节点并插入到等待队列的末尾。
因为 ReentrantLock 是可重入的,所以需要进行完全释放独占锁的操作。
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await() /** * 实现一个可中断的等待操作 * 1. 如果当前线程已经被中断过,抛出 InterruptException * 2. 保存 getState 返回的 lock state 的值 * 3. 调用 release() 并附带 lock state,如果失败,抛出 IllegalMonitorStateException * 4. 线程会被阻塞直到:① 其它线程调用 signal() 或 signalAll()。② 线程被中断。 * 5. 通过调用定制的 acquire() 重新获取锁。 * 6. 在步骤 4 中发生中断操作,则会抛出 InterruptException */ public final void await() throws InterruptedException { // #1 判断线程是否已经被中断过 if (Thread.interrupted()) // 直接抛出异常 throw new InterruptedException(); // #2 将当前线程包装为Node节点并插入到等待队列中 Node node = addConditionWaiter(); // #3 完全释放独占锁,savedState 所以当前线程的锁状态, // 在线程唤醒后恢复操作时用到 int savedState = fullyRelease(node); int interruptMode = 0; // #4 如果不在「阻塞队列」中, while (!isOnSyncQueue(node)) { // #5 isOnSyncQueue() 返回false才会走到这里,false 表示没有在阻塞队列中。 // 挂起线程。唤醒操作需要另一个线程来操作 LockSupport.park(this); // #6 中断判断 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // #7 线程被唤醒且已存在阻塞队列了 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
addConditionWaiter
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter /** * 将当前线程对应的节点入队,插入队尾。 * 单个条件内的操作是线程安全的,没有并发操作 */ private Node addConditionWaiter() { Node t = lastWaiter; // #1 如果尾部的Condition已被取消,就需要将其清除 if (t != null && t.waitStatus != Node.CONDITION) { // #2 这个方法遍历整个条件队列,将已取消的所有节点都清除出队列 unlinkCancelledWaiters(); t = lastWaiter; } // #3 node在初始化的时候,指定 ws 状态为 Node.CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); // #4 插入队列 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
unlinkCancelledWaiters
从头到尾遍历整个条件队列,将已取消的节点从队列中移除。
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#unlinkCancelledWaiters private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; // 如果节点状态不是Node.CONDITION的话,意味着这个节点被取消了 if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
fullyRelease(Node)
// java.util.concurrent.locks.AbstractQueuedSynchronizer#fullyRelease /** * 完全释放当前线程所持有的锁,因此存在锁重入,即 state > 1,所以我们记录重入次数,代码叫 savedState, * 所以 fullyRelease 函数返回的是当线程的锁状态。 * 当一个线程在不持有锁的情况下,调用 Condition.await() 方法就会抛出异常 */ final int fullyRelease(Node node) { boolean failed = true; try { // #1 获取此刻锁的 state int savedState = getState(); // #2 将state修改为0 if (release(savedState)) { // #3 成功释放锁,返回旧的锁状态,以备恢复时所使用 failed = false; return savedState; } else { // #4 修改失败,抛出异常 throw new IllegalMonitorStateException(); } } finally { // 如果出现失败,节点也被取消 if (failed) node.waitStatus = Node.CANCELLED; } }
isOnSyncQueue(Node)
这个函数是判断 Node 是否存在阻塞队列中。返回 true 意味着节点 Node 在阻塞队列中,false 表示不存在阻塞队列。
// java.util.concurrent.locks.AbstractQueuedSynchronizer#isOnSyncQueue /** * 判断node是否已经被移动到阻塞队列 */ final boolean isOnSyncQueue(Node node) { // #1 节点状态为初始状态、或前驱节点为null,说明还在条件队列中,直接返回false // 因为节点被移动到阻塞队列时 ws 状态会被修改为 0 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // #2 如果当前 node 存在后继节点,那铁定在阻塞队列。 // 因为我们操作的 node 是在条件队列的末尾,并且没有并发操作,所以一旦发现 // 它存在后驱节点的话,那意味着在阻塞队列中。 if (node.next != null) // If has successor, it must be on queue return true; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ // #3 从队列尾部开始从后往前遍历,如果找到相等的节点,说明节点node在阻塞队列中。否则不存在。 return findNodeFromTail(node); }
signal()
唤醒等待时间最长的线程,被唤醒的线程需要重新获得独占锁。 ```java // java.util.concurrent.locks.Condition /**
- 唤醒等待时间最长的线程
- 当条件满足后,就需要唤醒线程,被唤醒的线程会从条件队列转移到阻塞队列 */ public final void signal() { // #1 调用 signal() 方法必须持有独占锁,所以这里会做一下安全判断 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) // #2 唤醒线程 doSignal(first); }
protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don’t need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); }
<a name="kGlxh"></a>
#### doSignal
在条件队列从头往后遍历,找出第一个符合转移的 Node。因为某些线程可能会主动取消排队,所以还需要遍历获取。
```java
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#doSignal
private void doSignal(Node first) {
do {
// #1 等待队列中只有当前一个 Node,需要更新 lastWaiter 为 null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// #2 断连
first.nextWaiter = null;
// #3 transferForSinal(first) 转移到阻塞队列,如果转移不成功,会从下一个节点继续尝试
// 如果转移成功,则退出 while 循环
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
transferForSignal
将节点 node 插入到阻塞队列中。
// java.util.concurrent.locks.AbstractQueuedSynchronizer#transferForSignal
final boolean transferForSignal(Node node) {
// #1 CAS 更新 ws 为 0,如果更新失败,说明节点取消排队
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// #2 自旋插入阻塞队列的尾部,返回值 p 表示 node 在阻塞队列的前驱节点
Node p = enq(node);
// #3 获取前驱节点的 ws 状态值
int ws = p.waitStatus;
// #4 如果 ws > 0,说明前驱节点取消阻塞队列排队,需要对当前线程解除阻塞。
// 如果 ws <= 0,说明前驱节点状态正常,需要修改前驱节点状态为 -1,这样当它释放锁时就会唤醒后驱节点
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
线程唤醒
当前驱线程调用 LockSupport.unpark(thread)
后,后驱线程被唤醒,并尝试获得锁,继续向下执行。