一、AQS(AbstractQueuedSynchronizer)
AQS是一个抽象同步框架,实现一个依赖状态的同步器。java.util.concurrent包中,比如等待队列、条件队列、独占获取、共享获取等,都依赖AQS。
AQS特性
AQS内部属性
volatile int state
- state表示资源的可用状态
State三种访问方式:
- getState()
- setState()
- compareAndSetState()
AQS定义两种资源共享方式
- Exclusive-独占,只有一个线程能执行,如ReentrantLock
Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch
AQS定义两种队列
同步等待队列
AQS当中的同步等待队列也称CLH队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。
AQS 依赖CLH同步队列来完成同步状态的管理:- 当前线程,如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程
- 当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
- 通过signal或signalAll将条件队列中的节点转移到同步队列。(由条件队列转化为同步队列)
条件等待队列
AQS中条件队列是使用单向列表保存的,用nextWaiter来连接:
- 调用await方法阻塞线程;
- 当前线程存在于同步队列的头结点,调用await方法进行阻塞(从同步队列转化到条件队列)
1. 调用Condition#await方法会释放当前持有的锁,然后阻塞当前线程,同时向Condition队列**尾部**添加一个节点,所以调用Condition#await方法的时候**必须持有锁**。
1. 调用Condition#signal方法会将Condition队列的**首节点**移动到同步等待队列**尾部**,然后唤醒因调用Condition#await方法而阻塞的线程(唤醒之后这个线程就可以去竞争锁),所以调用Condition#signal方法的时候**必须持有锁**,持有锁的线程唤醒被因调用Condition#await方法而阻塞的线程。
二、ReentrantLock应用与原理
ReentrantLock是一种基于AQS框架的应用实现,功能类似于synchronized,是一种互斥锁,可以保证线程安全。
相对于 synchronized, ReentrantLock具备如下特点:
- 可中断
- 可以设置超时时间
- 可以设置为公平锁
- 支持多个条件变量
- 与 synchronized 一样,都支持可重入
对比区别
synchronized | ReentrantLock |
---|---|
JVM层次的锁实现 | JDK层次的锁实现 |
锁状态是无法在代码中直接判断 | 通过ReentrantLock#isLocked,获取锁定状态 |
非公平锁 | 可以是公平也可以是非公平 |
不可以被中断的 | 可以被中断 |
发生异常时,synchronized会自动释放锁 | 在finally块中显示释放锁 |
特定的情况下对于已经在等待的线程是后来的线程先获得锁 | 已经在等待的线程是先来的线程先获得锁 |
1.ReentrantLock应用测试
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Thread thread0 = new Thread(() -> {
lock.lock();
try {
System.out.println("输出测试0");
} finally {
lock.unlock();
}
});
thread0.start();
Thread thread1 = new Thread(() -> {
lock.lock();
try {
System.out.println("输出测试1");
} finally {
lock.unlock();
}
});
thread1.start();
Thread thread2 = new Thread(() -> {
lock.lock();
try {
System.out.println("输出测试2");
} finally {
lock.unlock();
}
});
thread2.start();
}
1.1.可中断
@Slf4j
public class ReentrantLockDemo3 {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("t1启动...");
try {
lock.lockInterruptibly();
try {
log.debug("t1获得了锁");
} finally {
lock.unlock();
}
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("t1等锁的过程中被中断");
}
}, "t1");
lock.lock();
try {
log.debug("main线程获得了锁");
t1.start();
//先让线程t1执行
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
t1.interrupt();
log.debug("线程t1执行中断");
} finally {
lock.unlock();
}
}
1.2.超时:立即失败
@Slf4j
public class ReentrantLockDemo4 {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("t1启动...");
// 注意: 即使是设置的公平锁,此方法也会立即返回获取锁成功或失败,公平策略不生效
if (!lock.tryLock()) {
log.debug("t1获取锁失败,立即返回false");
return;
}
try {
log.debug("t1获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
try {
log.debug("main线程获得了锁");
t1.start();
//先让线程t1执行
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
1.2超时指定时间失败
@Slf4j
public class ReentrantLockDemo4 {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("t1启动...");
//超时
try {
if (!lock.tryLock(1, TimeUnit.SECONDS)) {
log.debug("等待 1s 后获取锁失败,返回");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
try {
log.debug("t1获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
try {
log.debug("main线程获得了锁");
t1.start();
//先让线程t1执行
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
2.ReentrantLock获取锁流程
三、ReentrantLock源码解析
1.重要变量
/** 标记节点为共享模式 */
static final Node SHARED = new Node();
/** 标记节点为独占模式 */
static final Node EXCLUSIVE = null;
/** 标记在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待(即从队列中移除) */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking
* 标记后继节点的线程处于等待状态,而当前的节点如果释放了同步状态或者被取消
* 将会唤醒后继节点,使后继节点的线程得以运行。
*/
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition
* 标记节点在等待队列中,节点的线程等待在Condition上,当其他线程对Condition调用了signal()方法后
* 该节点会从等待队列中转移到同步队列中,加入到同步状态的获取中
*/
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
* 标识下一次共享式同步状态获取将会被无条件地传播下去
*/
static final int PROPAGATE = -3;
/**
* 标记当前节点的信号量状态 (1,0,-1,-2,-3)5种状态
* 使用CAS更改状态,volatile保证线程可见性,高并发场景下,
* 即被一个线程修改后,状态会立马让其他线程可见。
*/
volatile int waitStatus;
2.获取锁逻辑
加锁:
阻塞需要for循环执行两次
for{
第一次循环:把阻塞元素前一个元素waitstatus设置为-1
第二次循环:阻塞当前线程(park)
}
/**
* 默认为非公平锁
*/
public void lock() {
sync.lock();
}
2.1.非公平式获取锁
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* 1.立即执行获取锁操作。
* 2.失败,则执行常规获取锁操作
*/
final void lock() {
//立即尝试获取锁,如果当前state=0,则可以获取锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//常规获取锁操作
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
采用非公平方式,可以提高效率。可以直接由活跃线程获取锁,避免unpark唤醒耗时。
解锁过程
1.设置state=0
非公平模式下,可在此处插队
2.唤醒线程
3.刚唤醒线程,执行尝试获取锁逻辑
4.从等待队列中,移除刚唤醒的节点。
2.2.常规获取锁
/**
* 1.尝试获取锁
* 2.第一步失败,则创建等待节点,并入队
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//调用NonfairSync中获取锁实现
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
/**
* 调用Sync非公平获取锁
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//处理锁重入场景
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
2.3.创建等待节点
//创建等待节点,并插入CHL(双向)队列尾部
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//将尾部节点暂存到pred中
Node pred = tail;
//队列为空时,则tail和head节点都为null,如果perd不为null,则同步队列不为空。
if (pred != null) {
//队列不为空,执行插入操作
node.prev = pred;//设置当前节点前驱为原队列尾节点
if (compareAndSetTail(pred, node)) {//通过cas设置当前节点为尾节点
pred.next = node;//之前尾节点指向当前节点
return node;
}
}
//当前同步队列为null,执行队列初始化,并将当前节点加入共同队列
enq(node);
return node;
}
2.4.初始化等待队列,并插入第一个元素
//1.第一次循环,判断队列是否为空,为空则初始化
//2.第二次循环,则将节点,插入刚初始化好的队列中
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))//通过cas初始化CHL队列
tail = head;//头、尾节点指向同一个位置。
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {//通过cas设置当前节点为尾节点
t.next = node;
return t;
}
}
}
}
2.5.阻塞加入队列中线程节点
//阻塞队列中的节点,将当前线程阻塞,等待获取锁(阻塞前如果是第一个节点,则再次尝试加锁)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//获取当前节点前驱节点
//如果当前节点前驱节点为head(则当前节点是队列中第一个有效节点),则再次尝试获取锁。(该节点是同步队列中的第一个节点,避免阻塞性能损耗,再次获取锁)
if (p == head && tryAcquire(arg)) {
//执行到此处,表示待阻塞节点,已经获取锁不必再阻塞,故将当前节点设置为同步队列的head(头)节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/**
* 第一次循环,(shouldParkAfterFailedAcquire方法)将当前【待阻塞】节点,前一个节点的waitStatus设置为-1,返回false
* 第二次循环,当前待阻塞节点的前一个节点waitStatus=-1,则返回true。
* 继续执行parkAndCheckInterrupt方法(1.执行线程阻塞,2.并检查是否有中断标识【可能在阻塞期间被设置了中断】)
**/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//***线程interrupt方法,会唤醒park阻塞的线程**/
interrupted = true;
}
} finally {
//发生异常时,failed才会true,然后标记队列中节点为Cancel状态
if (failed)
cancelAcquire(node);
}
}
2.5.1.设置前驱节点信号量并清理无效节点
//判断当前节点是否可以被阻塞
//第一轮循环时,当前待阻塞节点的【前一个节点】waitStatus修改为-1.signal=-1可被唤醒。为唤醒做准备,唤醒时,进行判断!=0
//第二次循环,当前待阻塞节点的前一个节点waitStatus=-1,则返回true。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {//大于0,表示失效节点
/*
* 循环清理等待队列中的失效节点
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 将前驱节点信号量设置为SIGNAL(-1)
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
3.解锁逻辑
解锁
1.设置state=0
非公平模式下,可在此处插队
2.唤醒线程
3.刚唤醒线程,执行尝试获取锁逻辑
4.从等待队列中,移除刚唤醒的节点。
public void unlock() {
sync.release(1);
}
3.1.解锁
//调用AQS方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
3.1.1.尝试解锁
/*调用ReentrantLock方法
* 解锁:即还原lock初始状态
* 返回:解锁结果。成功:true;失败:false
**/
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//必须获取锁的线程解锁
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//处理锁重入情况
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
3.1.2.唤醒线程
/*
* 1.解锁场景,入参为head节点,修改head,waitStatus=0
*
**/
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//如果后继节点不符合唤醒条件,则从队列尾部,查询队列最前端符合条件的待唤醒节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
//从队列尾部遍历,获取队列头部第一个waitStatus<0的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//唤醒阻塞线程
if (s != null)
LockSupport.unpark(s.thread);
}
3.2.唤醒线程继续执行
//继续执行循环操作
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
/*
* 唤醒节点为队列第一个节点,故尝试获取锁。
* 公平锁场景:肯定可以获取锁
* 非公平锁场景:获取锁可能失败,重新阻塞
*/
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
//获取锁成功,头结点设置为-1。(解锁时,会将头节点waitStatus设置为0)
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//获取锁失败,头结点设置为-1。(解锁时,会将头节点waitStatus设置为0)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//发生异常时,failed才会true
if (failed)
cancelAcquire(node);
}
}
网络总结图