一、AQS(AbstractQueuedSynchronizer)
AQS是一个抽象同步框架,实现一个依赖状态的同步器。java.util.concurrent包中,比如等待队列、条件队列、独占获取、共享获取等,都依赖AQS。
AQS特性
AQS内部属性
volatile int state
- state表示资源的可用状态
 
State三种访问方式:
- getState()
 - setState()
 - compareAndSetState()
 
AQS定义两种资源共享方式
- Exclusive-独占,只有一个线程能执行,如ReentrantLock
 Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch
AQS定义两种队列
同步等待队列
AQS当中的同步等待队列也称CLH队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。
AQS 依赖CLH同步队列来完成同步状态的管理:- 当前线程,如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程
 - 当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
 - 通过signal或signalAll将条件队列中的节点转移到同步队列。(由条件队列转化为同步队列)
 
条件等待队列
AQS中条件队列是使用单向列表保存的,用nextWaiter来连接:
- 调用await方法阻塞线程;- 当前线程存在于同步队列的头结点,调用await方法进行阻塞(从同步队列转化到条件队列)

1. 调用Condition#await方法会释放当前持有的锁,然后阻塞当前线程,同时向Condition队列**尾部**添加一个节点,所以调用Condition#await方法的时候**必须持有锁**。1. 调用Condition#signal方法会将Condition队列的**首节点**移动到同步等待队列**尾部**,然后唤醒因调用Condition#await方法而阻塞的线程(唤醒之后这个线程就可以去竞争锁),所以调用Condition#signal方法的时候**必须持有锁**,持有锁的线程唤醒被因调用Condition#await方法而阻塞的线程。
二、ReentrantLock应用与原理
ReentrantLock是一种基于AQS框架的应用实现,功能类似于synchronized,是一种互斥锁,可以保证线程安全。
相对于 synchronized, ReentrantLock具备如下特点:
- 可中断
 - 可以设置超时时间
 - 可以设置为公平锁
 - 支持多个条件变量
 - 与 synchronized 一样,都支持可重入
 
对比区别
| synchronized | ReentrantLock | 
|---|---|
| JVM层次的锁实现 | JDK层次的锁实现 | 
| 锁状态是无法在代码中直接判断 | 通过ReentrantLock#isLocked,获取锁定状态 | 
| 非公平锁 | 可以是公平也可以是非公平 | 
| 不可以被中断的 | 可以被中断 | 
| 发生异常时,synchronized会自动释放锁 | 在finally块中显示释放锁 | 
| 特定的情况下对于已经在等待的线程是后来的线程先获得锁 | 已经在等待的线程是先来的线程先获得锁 | 
1.ReentrantLock应用测试
public static void main(String[] args) {Lock lock = new ReentrantLock();Thread thread0 = new Thread(() -> {lock.lock();try {System.out.println("输出测试0");} finally {lock.unlock();}});thread0.start();Thread thread1 = new Thread(() -> {lock.lock();try {System.out.println("输出测试1");} finally {lock.unlock();}});thread1.start();Thread thread2 = new Thread(() -> {lock.lock();try {System.out.println("输出测试2");} finally {lock.unlock();}});thread2.start();}
1.1.可中断
@Slf4jpublic class ReentrantLockDemo3 {public static void main(String[] args) {ReentrantLock lock = new ReentrantLock();Thread t1 = new Thread(() -> {log.debug("t1启动...");try {lock.lockInterruptibly();try {log.debug("t1获得了锁");} finally {lock.unlock();}} catch (InterruptedException e) {e.printStackTrace();log.debug("t1等锁的过程中被中断");}}, "t1");lock.lock();try {log.debug("main线程获得了锁");t1.start();//先让线程t1执行try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}t1.interrupt();log.debug("线程t1执行中断");} finally {lock.unlock();}}
1.2.超时:立即失败
@Slf4jpublic class ReentrantLockDemo4 {public static void main(String[] args) {ReentrantLock lock = new ReentrantLock();Thread t1 = new Thread(() -> {log.debug("t1启动...");// 注意: 即使是设置的公平锁,此方法也会立即返回获取锁成功或失败,公平策略不生效if (!lock.tryLock()) {log.debug("t1获取锁失败,立即返回false");return;}try {log.debug("t1获得了锁");} finally {lock.unlock();}}, "t1");lock.lock();try {log.debug("main线程获得了锁");t1.start();//先让线程t1执行try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}} finally {lock.unlock();}}
1.2超时指定时间失败
@Slf4jpublic class ReentrantLockDemo4 {public static void main(String[] args) {ReentrantLock lock = new ReentrantLock();Thread t1 = new Thread(() -> {log.debug("t1启动...");//超时try {if (!lock.tryLock(1, TimeUnit.SECONDS)) {log.debug("等待 1s 后获取锁失败,返回");return;}} catch (InterruptedException e) {e.printStackTrace();return;}try {log.debug("t1获得了锁");} finally {lock.unlock();}}, "t1");lock.lock();try {log.debug("main线程获得了锁");t1.start();//先让线程t1执行try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}} finally {lock.unlock();}}
2.ReentrantLock获取锁流程
三、ReentrantLock源码解析
1.重要变量
/** 标记节点为共享模式 */static final Node SHARED = new Node();/** 标记节点为独占模式 */static final Node EXCLUSIVE = null;/** 标记在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待(即从队列中移除) */static final int CANCELLED = 1;/** waitStatus value to indicate successor's thread needs unparking* 标记后继节点的线程处于等待状态,而当前的节点如果释放了同步状态或者被取消* 将会唤醒后继节点,使后继节点的线程得以运行。*/static final int SIGNAL = -1;/** waitStatus value to indicate thread is waiting on condition* 标记节点在等待队列中,节点的线程等待在Condition上,当其他线程对Condition调用了signal()方法后* 该节点会从等待队列中转移到同步队列中,加入到同步状态的获取中*/static final int CONDITION = -2;/*** waitStatus value to indicate the next acquireShared should* unconditionally propagate* 标识下一次共享式同步状态获取将会被无条件地传播下去*/static final int PROPAGATE = -3;/*** 标记当前节点的信号量状态 (1,0,-1,-2,-3)5种状态* 使用CAS更改状态,volatile保证线程可见性,高并发场景下,* 即被一个线程修改后,状态会立马让其他线程可见。*/volatile int waitStatus;
2.获取锁逻辑
加锁:
阻塞需要for循环执行两次
for{
第一次循环:把阻塞元素前一个元素waitstatus设置为-1
第二次循环:阻塞当前线程(park)
}
/*** 默认为非公平锁*/public void lock() {sync.lock();}
2.1.非公平式获取锁
/*** Sync object for non-fair locks*/static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** 1.立即执行获取锁操作。* 2.失败,则执行常规获取锁操作*/final void lock() {//立即尝试获取锁,如果当前state=0,则可以获取锁if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());else//常规获取锁操作acquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}采用非公平方式,可以提高效率。可以直接由活跃线程获取锁,避免unpark唤醒耗时。解锁过程1.设置state=0非公平模式下,可在此处插队2.唤醒线程3.刚唤醒线程,执行尝试获取锁逻辑4.从等待队列中,移除刚唤醒的节点。
2.2.常规获取锁
/*** 1.尝试获取锁* 2.第一步失败,则创建等待节点,并入队*/public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}//调用NonfairSync中获取锁实现protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}/*** 调用Sync非公平获取锁*/final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}//处理锁重入场景else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
2.3.创建等待节点
//创建等待节点,并插入CHL(双向)队列尾部private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failure//将尾部节点暂存到pred中Node pred = tail;//队列为空时,则tail和head节点都为null,如果perd不为null,则同步队列不为空。if (pred != null) {//队列不为空,执行插入操作node.prev = pred;//设置当前节点前驱为原队列尾节点if (compareAndSetTail(pred, node)) {//通过cas设置当前节点为尾节点pred.next = node;//之前尾节点指向当前节点return node;}}//当前同步队列为null,执行队列初始化,并将当前节点加入共同队列enq(node);return node;}
2.4.初始化等待队列,并插入第一个元素
//1.第一次循环,判断队列是否为空,为空则初始化//2.第二次循环,则将节点,插入刚初始化好的队列中private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))//通过cas初始化CHL队列tail = head;//头、尾节点指向同一个位置。} else {node.prev = t;if (compareAndSetTail(t, node)) {//通过cas设置当前节点为尾节点t.next = node;return t;}}}}
2.5.阻塞加入队列中线程节点
//阻塞队列中的节点,将当前线程阻塞,等待获取锁(阻塞前如果是第一个节点,则再次尝试加锁)final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();//获取当前节点前驱节点//如果当前节点前驱节点为head(则当前节点是队列中第一个有效节点),则再次尝试获取锁。(该节点是同步队列中的第一个节点,避免阻塞性能损耗,再次获取锁)if (p == head && tryAcquire(arg)) {//执行到此处,表示待阻塞节点,已经获取锁不必再阻塞,故将当前节点设置为同步队列的head(头)节点setHead(node);p.next = null; // help GCfailed = false;return interrupted;}/*** 第一次循环,(shouldParkAfterFailedAcquire方法)将当前【待阻塞】节点,前一个节点的waitStatus设置为-1,返回false* 第二次循环,当前待阻塞节点的前一个节点waitStatus=-1,则返回true。* 继续执行parkAndCheckInterrupt方法(1.执行线程阻塞,2.并检查是否有中断标识【可能在阻塞期间被设置了中断】)**/if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())//***线程interrupt方法,会唤醒park阻塞的线程**/interrupted = true;}} finally {//发生异常时,failed才会true,然后标记队列中节点为Cancel状态if (failed)cancelAcquire(node);}}
2.5.1.设置前驱节点信号量并清理无效节点
//判断当前节点是否可以被阻塞//第一轮循环时,当前待阻塞节点的【前一个节点】waitStatus修改为-1.signal=-1可被唤醒。为唤醒做准备,唤醒时,进行判断!=0//第二次循环,当前待阻塞节点的前一个节点waitStatus=-1,则返回true。private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;if (ws > 0) {//大于0,表示失效节点/** 循环清理等待队列中的失效节点*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** 将前驱节点信号量设置为SIGNAL(-1)*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
3.解锁逻辑
解锁
1.设置state=0
非公平模式下,可在此处插队
2.唤醒线程
3.刚唤醒线程,执行尝试获取锁逻辑
4.从等待队列中,移除刚唤醒的节点。
public void unlock() {sync.release(1);}
3.1.解锁
//调用AQS方法public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
3.1.1.尝试解锁
/*调用ReentrantLock方法* 解锁:即还原lock初始状态* 返回:解锁结果。成功:true;失败:false**/protected final boolean tryRelease(int releases) {int c = getState() - releases;//必须获取锁的线程解锁if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;//处理锁重入情况if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}
3.1.2.唤醒线程
/** 1.解锁场景,入参为head节点,修改head,waitStatus=0***/private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);//如果后继节点不符合唤醒条件,则从队列尾部,查询队列最前端符合条件的待唤醒节点Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;//从队列尾部遍历,获取队列头部第一个waitStatus<0的节点for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}//唤醒阻塞线程if (s != null)LockSupport.unpark(s.thread);}
3.2.唤醒线程继续执行
//继续执行循环操作final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {/** 唤醒节点为队列第一个节点,故尝试获取锁。* 公平锁场景:肯定可以获取锁* 非公平锁场景:获取锁可能失败,重新阻塞*/final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {//获取锁成功,头结点设置为-1。(解锁时,会将头节点waitStatus设置为0)setHead(node);p.next = null; // help GCfailed = false;return interrupted;}//获取锁失败,头结点设置为-1。(解锁时,会将头节点waitStatus设置为0)if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {//发生异常时,failed才会trueif (failed)cancelAcquire(node);}}
网络总结图

