6.1 锁与队列的关系
- CLH锁的内部队列
- 分布式锁的内部队列
- AQS的内部队列
6.2 AQS的核心成员
6.2.1 状态标志位
//同步状态,使用 volatile保证线程可见
private volatile int state;
state因为使用volatile保证了操作的可见性,所以任何线程通过getState()获得状态都可以得到最新值。AQS提供了getState()、setState()来获取和设置同步状态,具体如下:
// 获取同步的状态
protected final int getState() {
return state;
}
// 设置同步的状态
protected final void setState(int newState) {
state = newState;
}
// 通过CAS设置同步的状态
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AbstractQueuedSynchronizer继承了AbstractOwnableSynchronizer,这个基类只有一个变量叫exclusiveOwnerThread,表示当前占用该锁的线程,并且提供了相应的get()和set()方法,具体如下:
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
//表示当前占用该锁的线程
private transient Thread exclusiveOwnerThread;
// 省略get/set方法
}
6.2.2 队列节点类
AQS是一个虚拟队列,不存在队列实例,仅存在节点之间的前后关系。节点类型通过内部类Node定义,其核心的成员如下:
static final class Node {
/**节点等待状态值1:取消状态*/
static final int CANCELLED = 1;
/**节点等待状态值-1:标识后继线程处于等待状态*/
static final int SIGNAL = -1;
/**节点等待状态值-2:标识当前线程正在进行条件等待*/
static final int CONDITION = -2;
/**节点等待状态值-3:标识下一次共享锁的acquireShared操作需要无条件传播*/
static final int PROPAGATE = -3;
//节点状态:值为SIGNAL、CANCELLED、CONDITION、PROPAGATE、0
//普通的同步节点的初始值为0,条件等待节点的初始值为CONDITION(-2)
volatile int waitStatus;
//节点所对应的线程,为抢锁线程或者条件等待线程
volatile Thread thread;
//前驱节点,当前节点会在前驱节点上自旋,循环检查前驱节点的waitStatus状态
volatile Node prev;
//后继节点
volatile Node next;
//若当前Node不是普通节点而是条件等待节点,则节点处于某个条件的等待队列上
//此属性指向下一个条件等待节点,即其条件队列上的后继节点
Node nextWaiter;
...
}
6.2.3 FIFO双向同步队列
AQS的内部队列是CLH队列的变种,每当线程通过AQS获取锁失败时,线程将被封装成一个Node节点,通过CAS原子操作插入队列尾部。当有线程释放锁时,AQS会尝试让队头的后继节点占用锁。
/*首节点的引用*/
private transient volatile Node head;
/*尾节点的引用*/
private transient volatile Node tail;
6.2.4 JUC显式锁与AQS的关系
6.2.5 ReentrantLock与AQS的组合关系
- ReentrantLock与AQS的组合关系
ReentrantLock把所有Lock接口的操作都委派到一个Sync类上,该类继承了AbstractQueuedSynchronizer:
static abstract class Sync extends AbstractQueuedSynchronizer {...}
ReentrantLock为了支持公平锁和非公平锁两种模式,为Sync又定义了两个子类,具体如下:
final static class NonfairSync extends Sync {...}
final static class FairSync extends Sync {...}
ReentrantLock提供了两个构造器,具体如下:
public ReentrantLock() { //默认的构造器
sync = new NonfairSync(); //内部使用非公平同步器
}
public ReentrantLock(boolean fair) { //true 为公平锁,否则为非公平锁
sync = fair ? new FairSync() : new NonfairSync();
}
由ReentrantLock的lock()和unlock()的源码可以看到,它们只是分别调用了sync对象的lock()和release()方法。
public void lock() { //抢占显式锁
sync.lock();
}
public void unlock() { //释放显式锁
sync.release(1);
}
-
6.3 AQS中的模板模式
6.3.1 模板模式
6.3.2 一个模板模式的参考实现
模板模式的参考实现代码 ```java package com.crazymakercircle.demo.lock; import com.crazymakercircle.util.Print; public class TemplateDemo {
static abstract class AbstractAction
{
/**
* 模板方法:算法骨架
*/
public void tempMethod()
{
Print.cfo("模板方法的算法骨架被执行");
beforeAction(); // 执行前的公共操作
action(); // 调用钩子方法
afterAction(); // 执行后的公共操作
}
/**
* 执行前
*/
protected void beforeAction()
{
Print.cfo("准备执行钩子方法");
}
/**
* 钩子方法:这里定义为一个抽象方法
*/
public abstract void action();
/**
* 执行后
*/
private void afterAction()
{
Print.cfo("钩子方法执行完成");
}
}
//子类A:提供了钩子方法实现
static class ActionA extends AbstractAction
{
/**
* 钩子方法的实现
*/
@Override
public void action()
{
Print.cfo("钩子方法的实现 ActionA.action() 被执行");
}
}
//子类B:提供了钩子方法实现
static class ActionB extends AbstractAction
{
/**
* 钩子方法的实现
*/
@Override
public void action()
{
Print.cfo("钩子方法的实现 ActionB.action() 被执行");
}
}
public static void main(String[] args)
{
AbstractAction action = null;
//创建一个 ActionA 实例
action= new ActionA();
//执行基类的模板方法
action.tempMethod();
//创建一个 ActionB 实例
action= new ActionB();
//执行基类的模板方法
action.tempMethod();
}
}
<a name="jHzxv"></a>
## 6.3.3 AQS的模板流程
- Exclusive(独享锁):只有一个线程能占有锁资源,如ReentrantLock。独享锁又可分为公平锁和非公平锁。
- Share(共享锁):多个线程可同时占有锁资源,如Semaphore、CountDownLatch、CyclicBarrier、ReadWriteLock的Read锁。
<a name="MhyL3"></a>
## 6.3.4 AQS中的钩子方法
自定义同步器时,AQS中需要重写的钩子方法大致如下:
- tryAcquire(int):独占锁钩子,尝试获取资源,若成功则返回true,若失败则返回false
- tryRelease(int):独占锁钩子,尝试释放资源,若成功则返回true,若失败则返回false
- tryAcquireShared(int):共享锁钩子,尝试获取资源,负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享锁钩子,尝试释放资源,若成功则返回true,若失败则返回false。
- isHeldExclusively():独占锁钩子,判断该线程是否正在独占资源。只有用到condition条件队列时才需要去实现它。
1. tryAcquire独占式获取锁
```java
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
- tryRelease独占式释放锁
```java
protected boolean tryRelease(long arg) {
throw new UnsupportedOperationException();
}
3. tryAcquireShared共享式获取
```java
protected long tryAcquireShared(long arg) {
throw new UnsupportedOperationException();
}
- tryReleaseShared共享式释放
```java
protected boolean tryReleaseShared(long arg) {
throw new UnsupportedOperationException();
}
5. 查询是否处于独占模式
```java
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
6.4 通过AQS实现一把简单的独占锁
6.4.1 简单的独占锁的UML类图
6.4.2 简单的独占锁的实现
package com.crazymakercircle.demo.lock.custom;
// 省略import
public class SimpleMockLock implements Lock
{
//同步器实例
private final Sync sync = new Sync();
// 自定义的内部类:同步器
// 直接使用 AbstractQueuedSynchronizer.state 值表示锁的状态
// AbstractQueuedSynchronizer.state=1 表示锁没有被占用
// AbstractQueuedSynchronizer.state=0 表示锁没已经被占用
private static class Sync extends AbstractQueuedSynchronizer
{
//钩子方法
protected boolean tryAcquire(int arg)
{
//CAS更新状态值为1
if (compareAndSetState(0, 1))
{
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//钩子方法
protected boolean tryRelease(int arg)
{
//如果当前线程不是占用锁的线程
if (Thread.currentThread() != getExclusiveOwnerThread())
{
//抛出非法状态的异常
throw new IllegalMonitorStateException();
}
//如果锁的状态为没有占用
if (getState() == 0)
{
//抛出非法状态的异常
throw new IllegalMonitorStateException();
}
//接下来不需要使用CAS操作,因为下面的操作不存在并发场景
setExclusiveOwnerThread(null);
//设置状态
setState(0);
return true;
}
}
//显式锁的抢占方法
@Override
public void lock()
{
//委托给同步器的acquire()抢占方法
sync.acquire(1);
}
//显式锁的释放方法
@Override
public void unlock()
{
//委托给同步器的release()释放方法
sync.release(1);
}
// 省略其他未实现的方法
}
}
6.4.3 SimpleMockLock测试用例
package com.crazymakercircle.demo.lock;
// 省略import
public class LockTest
{
@org.junit.Test
public void testMockLock()
{
// 每个线程的执行轮数
final int TURNS = 1000;
// 线程数
final int THREADS = 10;
//线程池,用于多线程模拟测试
ExecutorService pool = Executors.newFixedThreadPool(THREADS);
//自定义的独占锁
Lock lock = new SimpleMockLock();
// 倒数闩
CountDownLatch countDownLatch = new CountDownLatch(THREADS);
long start = System.currentTimeMillis();
//10个线程并发执行
for (int i = 0; i < THREADS; i++)
{
pool.submit(() ->
{
try
{
//累加 1000 次
for (int j = 0; j < TURNS; j++)
{
//传入锁,执行一次累加
IncrementData.lockAndFastIncrease(lock);
}
Print.tco("本线程累加完成");
} catch (Exception e)
{
e.printStackTrace();
}
//线程执行完成,倒数闩减少一次
countDownLatch.countDown();
});
}
// 省略等待并发执行完成、结果输出的代码
}
}
6.5 AQS锁抢占的原理
6.5.1 显式锁抢占的总体流程
6.5.2 AQS模板方法:acquire(arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
6.5.3 钩子实现:tryAcquire(arg)
private static class Sync extends AbstractQueuedSynchronizer
{
//钩子方法
protected boolean tryAcquire(int arg)
{
//CAS更新状态值为1
if (compareAndSetState(0, 1))
{
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
6.5.4 直接入队:addWaiter
private Node addWaiter(Node mode) {
//创建新节点
Node node = new Node(Thread.currentThread(), mode);
// 加入队列尾部,将目前的队列tail作为自己的前驱节点pred
Node pred = tail;
// 队列不为空的时候
if (pred != null) {
node.prev = pred;
// 先尝试通过AQS方式修改尾节点为最新的节点
// 如果修改成功,将节点加入队列的尾部
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//第一次尝试添加尾部失败,意味着有并发抢锁发生,需要进行自旋
enq(node);
return node;
}
static final class Node {
/** 常量标识:标识当前的队列节点类型为共享型抢占 */
static final Node SHARED = new Node();
/** 常量标识:标识当前的队列节点类型为独占型抢占 */
static final Node EXCLUSIVE = null;
// 省略其他代码
}
6.5.5 自旋入队:enq
/**
* 这里进行了循环,如果此时存在tail,就执行添加新队尾的操作
* 如果依然不存在,就把当前线程作为head节点
* 插入节点后,调用acquireQueued()进行阻塞
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
//队列为空,初始化尾节点和头节点为新节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 队列不为空,将新节点插入队列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* CAS 操作head指针,仅仅被enq()调用
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* CAS 操作tail指针,仅仅被enq()调用
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
6.5.6 自旋抢占:acquireQueued()
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋检查当前节点的前驱节点是否为头节点,才能获取锁
for (;;) {
// 获取节点的前驱节点
final Node p = node.predecessor();
// 节点中的线程循环地检查自己的前驱节点是否为 head 节点
// 前驱节点是head时,进一步调用子类的tryAcquire(…)实现
if (p == head && tryAcquire(arg)) {
// tryAcquire()成功后,将当前节点设置为头节点,移除之前的头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 检查前一个节点的状态,预判当前获取锁失败的线程是否要挂起
// 如果需要挂起
// 调用parkAndCheckInterrupt()方法挂起当前线程,直到被唤醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; // 若两个操作都是true,则置true
}
} finally {
//如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了)
//那么取消节点在队列中的等待
if (failed)
//取消请求,将当前节点从队列中移除
cancelAcquire(node);
}
}
6.5.7 挂起预判:shouldParkAfterFailedAcquire()
private static boolean shouldParkAfterFailedAcquire(
Node pred, Node node) {
int ws = pred.waitStatus; // 获得前驱节点的状态
if (ws == Node.SIGNAL) //如果前驱节点状态为SIGNAL(值为-1)就直接返回
return true;
if (ws > 0) { // 前驱节点以及取消CANCELLED(1)
do {
// 不断地循环,找到有效前驱节点,即非CANCELLED(值为1)类型节点
// 将pred记录前驱的前驱
pred = pred.prev;
// 调整当前节点的prev指针,保持为前驱的前驱
node.prev = pred;
} while (pred.waitStatus > 0);
// 调整前驱节点的next指针
pred.next = node;
} else {
// 如果前驱状态不是CANCELLED,也不是SIGNAL,就设置为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
// 设置前驱状态之后,此方法返回值还是为false,表示线程不可用,被阻塞
}
return false;
}
6.5.8 线程挂起:parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 调用park()使线程进入waiting状态
return Thread.interrupted(); // 如果被唤醒,查看自己是否已经被中断
}
6.6 AQS的两个关键点:节点的入队和出队
6.6.1 节点的自旋入队
private Node enq(final Node node) {
for (;;) { //自旋入队
Node t = tail;
if (t == null) {
//队列为空,初始化尾节点和头节点为新节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
//如果队列不为空,将新节点插入队列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
6.6.2 节点的出队
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 在前驱节点上自旋
for (;;) {
// 获取节点的前驱节点
final Node p = node.predecessor();
// (1)前驱节点是头节点
// (2)通过子类的tryAcquire()钩子实现抢占成功
if (p == head && tryAcquire(arg)) {
// 将当前节点设置为头节点,之前的头节点出队
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 省略park(无限期阻塞)线程的代码
}
} finally {
// 省略其他
}
}
AQS释放锁时是如何唤醒后继线程的呢?AQS释放锁的核心代码如下:
public final boolean release(long arg) {
if (tryRelease(arg)) { //释放锁的钩子方法的实现
Node h = head; //队列头节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); //唤醒后继线程
return true;
}
return false;
}
unparkSuccessor的核心代码如下:
private void unparkSuccessor(Node node) {
// 省略不相关代码
Node s = node.next; //后继节点
// 省略不相关代码
if (s != null)
LockSupport.unpark(s.thread); //唤醒后继节点的线程
}
6.7 AQS锁释放的原理
6.7.1 SimpleMockLock独占锁的释放流程
6.7.2 AQS模板方法:release()
public final boolean release(long arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
6.7.3 钩子实现:tryRelease()
//钩子方法
protected boolean tryRelease(int arg)
{
//如果当前线程不是占用锁的线程
if (Thread.currentThread() != getExclusiveOwnerThread())
{
//抛出非法状态的异常
throw new IllegalMonitorStateException();
}
//如果锁的状态为没有占用
if (getState() == 0)
{
//抛出非法状态的异常
throw new IllegalMonitorStateException();
}
//接下来不需要使用CAS操作,因为下面的操作不存在并发场景
setExclusiveOwnerThread(null);
//设置状态
setState(0);
return true;
}
}
6.7.4 唤醒后继:unparkSuccessor()
private void unparkSuccessor(Node node) {
int ws = node.waitStatus; // 获得节点状态,释放锁的节点,也就是头节点
//CANCELLED(1)、SIGNAL(-1)、CONDITION (-2)、PROPAGATE(-3)
//若头节点状态小于0,则将其置为0,表示初始状态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next; // 找到后头的一个节点
if (s == null || s.waitStatus > 0) {
// 如果新节点已经被取消CANCELLED(1)
s = null;
// 从队列尾部开始,往前去找最前面的一个 waitStatus 小于0的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) s = t;
}
//唤醒后继节点对应的线程
if (s != null)
LockSupport.unpark(s.thread);
}
6.8 ReentrantLock的抢锁流程
6.8.1 ReentrantLock非公平锁的抢占流程
6.8.2 非公平锁的同步器子类
ReentrantLock为非公平锁实现了一个内部的同步器——NonfairSync,其显式锁获取方法lock()的源码如下:
static final class NonfairSync extends Sync {
//非公平锁抢占
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// 省略其他
}
6.8.3 非公平抢占的钩子方法:tryAcquire(arg)
static final class NonfairSync extends Sync {
//非公平锁抢占的钩子方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// 省略其他
}
abstract static class Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 先直接获得锁的状态
int c = getState();
if (c == 0) {
// 如果内部队列首节点的线程执行完了,它会将锁的state设置为0
// 当前抢锁线程的下一步就是直接进行抢占,不管不顾
// 发现state是空的,就直接拿来加锁使用,根本不考虑后面继承者的存在
if (compareAndSetState(0, acquires)) {
// 1. 利用CAS自旋方式判断当前state确实为0,然后设置成acquire(1)
// 这是原子性的操作,可以保证线程安全
setExclusiveOwnerThread(current);
// 设置当前执行的线程,直接返回true
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 2.当前的线程和执行中的线程是同一个,也就意味着可重入操作
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
// 表示当前锁被1个线程重复获取了nextc次
return true;
}
// 否则就返回false,表示没有成功获取当前锁,进入排队过程
return false;
}
// 省略其他
}
6.8.4 ReentrantLock公平锁的抢占流程
6.8.5 公平锁的同步器子类
static final class FairSync extends Sync {
//公平锁抢占的钩子方法
final void lock() {
acquire(1);
}
// 省略其他
}
6.8.6 公平抢占的钩子方法:tryAcquire(arg)
static final class FairSync extends Sync {
//公平抢占的钩子方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); //锁状态
if (c == 0) {
if (!hasQueuedPredecessors() && //有后继节点就返回,足够讲义气
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
6.8.7 是否有后继节点的判断
6.9 AQS条件队列
6.9.1 Condition基本原理
public class ConditionObject implements Condition, java.io.Serializable {
//记录该队列的头节点
private transient Node firstWaiter;
//记录该队列的尾节点
private transient Node lastWaiter;
}
在一个显式锁上,我们可以创建多个等待任务队列,这点和内置锁不同,Java内置锁上只有唯一的一个等待队列。比如,我们可以调用newCondition()创建两个等待队列,具体如下:
private Lock lock = new ReentrantLock();
//创建第一个等待队列
private Condition firstCond = lock.newCondition();
//创建第二个等待队列
private Condition secondCond = lock.newCondition();
6.9.2 await()等待方法原理
在await()方法将当前线程挪动到Condition等待队列后,还会唤醒AQS同步队列中head节点的下一个节点。await()方法的核心代码如下:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); // step 1
int savedState = fullyRelease(node); // step 2
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // step 3
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) // step 4
&& interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) //step 5
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
创建一个新节点并放入Condition队列尾部的工作由addConditionWaiter()方法完成,该方法具体如下:
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果尾节点取消,重新定位尾节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建一个新Node,作为等待节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 将新Node加入等待队列
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
6.9.3 signal()唤醒方法原理
//唤醒
public final void signal() {
//如果当前线程不是持有该锁的线程,就抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first); //唤醒头节点
}
//执行唤醒
private void doSignal(Node first) {
do {
//出队的代码写的很巧妙,要看仔细
//first出队,firstWaiter 头部指向下一个节点,自己的nextWaiter
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null; //如果第二节点为空,则尾部也为空
//将原来头部first的后继置空,help for GC
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
//将被唤醒的节点转移到同步队列
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node); // step 1
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // step 2:唤醒线程
return true;
}