非公平锁实现原理
ReentrantLock
默认实现为非公平锁,所谓的公平和非公平主要是AQS的同步器使用不一样,观察ReentrantLock
的构造方法
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public ReentrantLock() {
sync = new NonfairSync();
}
加锁流程
竞争线程执行流程
该流程需要配合源码以及AQS原理这篇文章一起看Thread-1
执行了
CAS
尝试将state
由 0 改为 1,结果失败- 执行
AQS
的acquire()
方法并进入tryAcquire()
逻辑, tryAcquire()
调用非公平竞争锁方法尝试获取锁,如果此时锁被释放,则Thread-1
通过CAS
修改state
为1,则获取锁成功;否则获取失败返回false
,执行下一步- 接下来进入
addWaiter()
逻辑,构造Node
队列,将当前线程封装成Node
并加入队尾并返回当前线程节点,初始加入时对列中没有头结点和尾结点,程序创建一个空的Node
给head
和tail
- 当前线程已经进入等待队列尾部,调用
acquireQueued
方法【acquireQueued
是一个自旋逻辑,每次自旋都有可能尝试获取锁,前提是当前节点是head
的后继节点】,找到当前节点的前驱节点,判断前驱节点是否为head
节点(本例中刚好是)- 如果是
head
节点,再次调用tryAcquire()
,尝试获取锁,如果此时前面的线程释放了锁,则获取锁成功,修改当前节点为head
节点; - 如果不是
head
节点,则执行shouldParkAfterFailedAcquire
- 如果是
执行到这里当前线程已经最多尝试3次获取锁了【第一次执行
lock
的时候,第二次执行acquire()->tryAcquire()
,第三次执行acquireQueued()->tryAcquire()
】,但是仍然未获取到锁,那么当前线程节点需要找到一个可以唤醒自己的前驱节点,即先获取到前驱节点,- 如果前驱节点的状态为
SIGNAL
则返回true
,因为前驱节点也在等待,并且负责唤醒后继线程 - 如果前驱节点放弃了或者取消了,那么一直查找到状态正常的一个节点,返回
false
,并进行acquireQueued
的下一轮自旋 如果前驱节点状态正常,则修改前驱节点状态为
SIGNAL
,并继续进行acquireQueued
的下一轮自旋
- 如果前驱节点的状态为
如果到此时还没有获取到锁,那么执行
parkAndCheckInterrupt
阻塞当前线程(阻塞前一定找到状态为SIGNAL
的前驱节点),进入等待队列
- 出现竞争时的队列状态
- 图中红色三角表示该
Node
的waitStatus
状态,其中 0 为默认正常状态 - Node 的创建是懒惰的
- 其中第一个
Node
称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
- 图中红色三角表示该
- 如果此时又有其他线程过来竞争资源则队列状态变为
源码实现
final void lock() {
//尝试使用cas修改state,有竞争的时候会失败
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//竞争失败,走AQS的获取锁方法
acquire(1);
}
//这里是AQS 继承过来的方法
public final void acquire(int arg) {
//尝试一次tryAcquire,如果失败了执行addWaiter
if (!tryAcquire(arg) &&
//将线程加入队尾,并寻找前驱唤醒节点
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
//自我中断
selfInterrupt();
}
}
//这里是NonfairSync本身的方法
protected final boolean tryAcquire(int acquires) {
//调用非公平竞争
return nonfairTryAcquire(acquires);
}
//这里是非公平竞争实现
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果此时竞态条件被释放了
if (c == 0) {
//cas修改state状态
if (compareAndSetState(0, acquires)) {
//设置线程属主
setExclusiveOwnerThread(current);
return true;
}
}
// 如果竞态条件依然存在,判断是不是锁重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//获取锁失败
return false;
}
解锁流程
Thread-0
释放锁,进入 release
流程,如果成功
- 设置
exclusiveOwnerThread
为null
- 设置
state
= 0
线程释放锁执行流程
Thread-0
释放锁,进入release
流程- 执行Sync的
tryRelease
流程,正常来说,tryRelease()
都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了tryRelease
执行成功后修改exclusiveOwnerThread
为null
- 修改
state = 0
,如果Thread-0
进行了锁重入,则state
本次修改不为0,只是将锁重入次数减一
tryRelease
执行成功返回true
执行AQS
的unparkSuccessor
流程,开始唤醒后继线程- 将当前线程(
Thread-0
)的waitState
状态修改为0
- 将当前线程(
b. 找到当前线程的后继线程,并判断后继线程的waitState
状态,如果后继线程为null
,或者取消了,那么就从队尾往前找,找到后继线程
c. 唤醒找到的线程
源码实现
//Sync的unlock
public void unlock() {
sync.release(1);
}
//Sync 继承自 AQS
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
//Sync的tryRelease
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//保险起见做的检验
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// AQS的实现
private void unparkSuccessor(Node node) {
//node为当前线程所在的结点。
int ws = node.waitStatus;
if (ws < 0){
//置零当前线程所在的结点状态,允许失败。
compareAndSetWaitStatus(node, ws, 0);
}
//找到下一个需要唤醒的结点s
Node s = node.next;
if (s == null || s.waitStatus > 0) {
//如果为空或已取消
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
// 当前节点的后继节点被取消了或者为空,则应该从后向前找状态正常的
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒
LockSupport.unpark(s.thread);
}
可重入原理
重点在加锁时的else if
发现锁的持有者还是当前线程,就对状态进行累加,以及解锁时的getState() - releases
直到state=0
才算解锁成功
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
// state++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//Sync的tryRelease
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
//保险起见做的检验
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支持锁重入, 只有 state 减为 0, 才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
可打断原理
可打断模式
- 如果没获取到锁,则进入
doAcquireInterruptibly
流程 doAcquireInterruptibly
流程与正常获取锁的流程(acquireQueued
)几乎一致,不同的地方在于,acquireQueued
在捕获到中断时,只是记录中断标记,然后继续自旋;而doAcquireInterruptibly
捕获到中断直接抛出中断异常,不会再进行下一轮循环。//ReentrantLock#lockInterruptibly
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
//继承自父类的方法
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//如果没获取到锁
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
//继承自父类的方法
private void doAcquireInterruptibly(int arg)throws InterruptedException {
//当前线程封装成节点,并加入队尾
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
// 在 park 过程中如果被 interrupt 会进入此, 这时候抛出异常, 而不会再次进入自旋,与不可打断相比,只是在此抛出异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
不可打断模式
在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了 ,不可打断模式的原理在上节已经说明,即
acquireQueued
在捕获到中断时,只是记录中断标记,然后继续自旋final boolean acquireQueued(final Node node, int arg) {
//标记是否成功拿到资源
boolean failed = true;
try {
//标记等待过程中是否被中断过
boolean interrupted = false;
//CAS自旋
for (; ; ) {
//拿到前驱节点
final Node p = node.predecessor();
//如果前驱是head,即该结点已成第二个节点,那么便有资格去尝试获取资源
if (p == head && tryAcquire(arg)) {
//拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。
setHead(node);
// setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
p.next = null;
// 成功获取资源
failed = false;
//返回等待过程中是否被中断过
return interrupted;
}
//如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
interrupted = true;
}
} finally {
// 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
if (failed)
cancelAcquire(node);
}
}
公平锁实现原理
公平锁与非公平锁的区别就是在尝试获取锁的逻辑上,即在争抢锁的时候会调用
hasQueuedPredecessors
方法,该方法的作用就是查看是否有比当前线程等待时间更久的线程存在,如果有就不争抢锁,如果没有就竞争锁 ```java static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//主要区别处,在尝试获取锁之前,查看是否有比当前线程等待时间更久的线程
//或者说查看当前线程是否有前驱线程(正常状态)
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
public final boolean hasQueuedPredecessors() { //获取尾结点
Node t = tail;
//获取头结点
Node h = head;
Node s;
return
//h!=t说明还有节点存在
h != t &&
//查看头结点后有没有节点
((s = h.next) == null ||
//头结点的后继节点不为空,且头结点的后继线程不是当前线程
s.thread != Thread.currentThread());
}
<a name="ferQP"></a>
# 条件变量实现原理
每个条件变量其实就对应着一个等待队列,其实现类是 `ConditionObject`
<a name="fL2sG"></a>
## await 流程
```java
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
开始 Thread-0
持有锁,调用 await
,进入 ConditionObject
的 addConditionWaiter
流程,创建新的 Node 状态为 -2(Node.CONDITION
),关联 Thread-0
,加入等待队列尾部
进入
addConditionWaiter
流程,添加一个Node
至等待队列private Node addConditionWaiter() {
//最后一个等待节点
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
// 所有已取消的 Node 从队列链表删除,并更新lastWaiter
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建一个关联当前线程的新 Node, 添加至队列尾部
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
进入 AQS 的
fullyRelease
流程,释放同步器上的锁,可能存在锁重入,需要将 state 全部释放final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
//把所有的锁重入值全部释放了
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
进入
isOnSyncQueue
逻辑,判断node
节点是否在同步对列中,- 如果不在同步队列,则
unpark AQS
队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么Thread-1
竞争成功
- 如果不在同步队列,则
b. 判断此次线程的唤醒是否因为线程被中断, 若是被中断,转移节点到同步对列中
signal 流程
//唤醒 - 必须持有锁才能唤醒, 因此 doSignal 内无需考虑加锁
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
//唤醒 - 将没取消的第一个节点转移至 AQS 队列
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (
// 将等待队列中的 Node 转移至 AQS 队列, 不成功且还有节点则继续循环
!transferForSignal(first) &&
// 队列还有节点
(first = firstWaiter) != null);
}
假设 Thread-1
要来唤醒 Thread-0
进入 ConditionObject
的 doSignal
流程,取得等待队列中第一个 Node
,即Thread-0
所在 Node``
执行 transferForSignal
流程,将该 Node
加入 AQS
队列尾部,将``Thread-0
的 waitStatus
改为 0,Thread-3
的waitStatus
改为 -1Thread-1
释放锁,进入 unlock
流程