目录
- LockSupport
- AQS
- ReentrantLock
- ReentrantReadWriteLock
-
LockSupport工具类
LockSupport 是个工具类,它的主要作用是挂起和唤醒线程,该工具类是创建锁和其他同步类的基础。LockSupport 类与每个使用它的线程都会关联一个许可证,在默认情况下调用LockSupport 类的方法的线程是不持有许可证的。LockSupport 是使用Unsafe 类实现的。
park
如果调用Locksupport.park()方法的线程已经拿到了与LockSupport关联的许可证,则调用Locksupport.park()时会马上返回,否则调用线程会被禁止参与线程的调度(默认情况下调用线程是不持有许可证的), 也就是会被阻塞挂起。直到以下任意情况出现:
他线程调用unpark(Thread thread)方法并且将当前线程作为参数时,调用park 方法而被阻塞的线程会返回
- 其他线程调用了阻塞线程的interrupt()方法,设置了中断标志,调用park 方法而被阻塞的线程返回。需要注意,返回时不会抛In terruptedException 异常。
线程被虚假唤醒
public static void park() {
UNSAFE.park(false, 0L);
}
unpark
当一个线程调用unpark 时,如果参数thread线程没有持有thread 与LockSupport 类关联的许可证, 则让thread 线程持有。
如果thread 之前因调用park()而被挂起,则调用unpark 后,该线程会被唤醒。
如果thread 之前没有调用park ,则调用unpark方法后,再调用park方法会立刻返回。
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
public class Code02_LockSupportDemo2 {
public static void main(String[] args) {
Thread thread1 = new Thread(Code02_LockSupportDemo2::parkThread, "thread1");
thread1.start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("mainThread unPark thread1");
LockSupport.unpark(thread1);
}
private static void parkThread() {
System.out.println(Thread.currentThread().getName() + "park");
LockSupport.park();
System.out.println(Thread.currentThread().getName() + " unPark返回");
}
}
park 方法返回时不会告诉你因何种原因返回,所以调用者需要根据之前调用park 方法的原因,再次检查条件是否满足,如果不满足则还需要再次调用park 方法。
public class Code03_LockSupportDemo3 {
public static void main(String[] args) {
Thread thread1 = new Thread(Code03_LockSupportDemo3::parkThread, "thread1");
thread1.start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("mainThread unPark thread1");
LockSupport.unpark(thread1);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("中断 thread1");
thread1.interrupt();
try {
thread1.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("结束");
}
private static void parkThread() {
System.out.println(Thread.currentThread().getName() + "park");
while (!Thread.currentThread().isInterrupted()) {
LockSupport.park();
}
System.out.println(Thread.currentThread().getName() + " 中断返回");
}
}
parkNanos
和park 方法类似,如果调用park 方法的线程已经拿到了与LockSupport 关联的许可证,则调用LockSupport.parkNanos(long nanos)方法后会马上返回。
该方法的不同在于,如果没有拿到许可证,则调用线程会被挂起nanos时间后修改为自动返回。
public static void parkNanos(long nanos) {
if (nanos > 0)
UNSAFE.park(false, nanos);
}
park(Object blocker)
当钱程在没有持有许可证的情况下调用park 方法而被阻塞挂起时,这个blocker 对象会被记录到该线程内部。
使用诊断工具可以观察线程被阻塞的原因,诊断工具是通过调用getB l ocker(T hread)方法来获取blocker 对象的,所以JDK 推荐我们使用带有blocker 参数的park 方法,并且blocker 被设置为this , 这样当在打印线程堆栈排查问题时就能知道是哪个类被阻塞了。
public static void park(Object blocker) {
Thread t = Thread.currentThread();
// 设置线程的blocker变量
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
// 线程被激活后清除blocker对象
setBlocker(t, null);
}
// 用来存放park 方法传递的blocker对象,
// 也就是把blocker变量存放到了调用park方法的线程的成员变量里面。
volatile Object parkBlocker;
parkNanos(Object blocker,long nanos)
public static void parkNanos(Object blocker, long nanos) {
if (nanos > 0) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, nanos);
setBlocker(t, null);
}
}
parkUntil(Object blocker, long deadline)
其中参数deadline 的时间单位为ms ,该时间是从19 7 0 年到现在某一个时间点的毫秒
值。
public static void parkUntil(Object blocker, long deadline) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(true, deadline);
setBlocker(t, null);
}
抽象同步队列 AQS
AbstractQueuedSynchronizer 抽象同步队列(AQS),是实现同步器的基础组件,java.util.concurrent.locks包中的锁的底层就是使用AQS实现。
AQS是一个FIFO的双向队列,内部通过head和tail记录队列头部和尾部元素,队列类型为Node。
head、tail
/**
等待队列的头节点,lazy Initialized。
除了初始化之外它只能通过方法setHead进行修改。
注意:如果head存在,它的waitStatus不能是CANCELLED。
*/
private transient volatile Node head;
/**
等待队列的头节点,lazy Initialized。
只能通过方法enq添加新的节点进行修改。
*/
private transient volatile Node tail;
/**
将队列头设置为节点,从而退出队列。仅由acquire方法调用。
为了GC,还会将未使用的字段置空,并抑制不必要的信号和遍历。
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
/**将节点插入队列,必要时初始化,返回前一个节点*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
当一个线程获取锁失败后该线程会被转换为Node 节点,然后就会使用enq(final Node node) 方法将该节点插入到AQS 的阻塞队列。
static final class Node {
/** 节点在共享模式下等待的标记 */
static final Node SHARED = new Node();
/** 节点在独享模式下的等标记 */
static final Node EXCLUSIVE = null;
/** waitStatus: 线程被取消了 */
static final int CANCELLED = 1;
/** waitStatus:线程需要被唤醒 */
static final int SIGNAL = -1;
/** waitStatus:线程在条件队列里面等待 */
static final int CONDITION = -2;
/**waitStatus :释放共享资源时需要通知其他节点 */
static final int PROPAGATE = -3;
/** 线程当前等待状态 */
volatile int waitStatus;
/** 当前节点的前面的节点 */
volatile Node prev;
/**当前节点后面的节点 */
volatile Node next;
/** 节点里的线程 */
volatile Thread thread;
/**
链接到下一个节点等待条件,或特殊值SHARED。
因为条件队列只在独占模式下被访问,所以我们只需要一个简单的链接队列来在节点等待条件时容纳节点。
然后它们被转移到队列中重新获取。由于条件只能是排他的,我们通过使用特殊的值来表示共享模式来保存字段。
*/
Node nextWaiter;
/** 节点是否在共享模式 */
final boolean isShared() {
return nextWaiter == SHARED;
}
/**返回前一个节点,如果为空则抛出NullPointerException */
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
- Node 中的thread 变量用来存放进入AQS队列里面的线程;
- Node中的SHARED用来标记该线程是获取共享资源时被阻塞挂起后放入AQS 队列的;
- EXCLUSIVE用来标记线程是获取独占资源时被挂起后放入AQS 队列的;
- waitStatus 记录当前线程等待状态,可以为CANCELLED (线程被取消了)、SIGNAL ( 线程需要被唤醒)、CONDITION (线程在条件队列里面等待〉、PROPAGATE (释放共享资源时需要通知其他节点〕;
- prev 记录当前节点的前驱节点, next 记录当前节点的后继节点。
state
AQS中state变量,记录着同步状态信息,可通过getState 、setState 、compareAndSetState 函数修改其值。
- 对于ReentrantLock来说,state可以用来表示当前线程获取锁的可重入次数
- 对于ReentrantReadWriteLock来说,state的高16位表示读状态,也就是获取该读锁的次数;低16位表示获取的写锁的线程的可重入次数。
- 对于Semaphore来说,state表示当前可以信号的个数
- 对于CountdownLatch来说,state表示计数器当前的值
/** 同步状态 */
private volatile int state;
/** 返回同步状态的当前值。该操作具有volatile读的内存语义。*/
protected final int getState() {
return state;
}
/** 设置同步状态的值。该操作具有volatile写的内存语义。*/
protected final void setState(int newState) {
state = newState;
}
/**如果当前状态值等于预期值,则自动将同步状态设置为给定的更新值。该操作具有volatile读写的内存语义。*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
获取锁和释放锁
对AQS来说,线程同步的关键是对状态值state进行操作,根据state是否属于一个线程,操作state的方式分为独占方式和共享方式。
独占方式获取锁和释放锁
/**
以独占模式获取,忽略中断。
通过至少调用一次tryAcquire,成功返回来实现。
否则,线程排队,可能反复阻塞和解阻塞,调用tryAcquire直到成功。
这个方法可以用来实现Lock.lock方法。
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
以独占模式获取,如果中断则终止。
首先检查中断状态,然后至少调用一次tryAcquire,成功返回。
否则,线程会排队,可能会反复阻塞和解除阻塞,调用tryAcquire,直到成功或线程被中断。
这个方法可以用来实现lock.lockinterruptible方法。
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
/**
尝试以独占模式获取,如果被中断则终止,如果给定超时超时则失败。
首先检查中断状态,然后至少调用一次tryAcquire,成功返回。
否则,线程将被排队,可能会反复阻塞和解除阻塞,并调用tryAcquire,直到成功或线程被中断或超时过去。
此方法可用于实现方法Lock.TimeUnittryLock(long time,TimeUnit unit)。
*/
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
/**
以独占模式发布。通过解除一个或多个线程的阻塞来实现,如果tryRelease返回true。
这个方法可以用来实现Lock.unlock方法。
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- 当一个线程调用acquire(int arg) 方法获取独占资源时,会首先使用tryAcquire 方法尝试获取资源,具体是设置状态变量state的值,成功则直接返回,失败则将当前线程封装为类型为Node. EXCLUSIVE 的Node 节点后插入到AQS 阻塞队列的尾部,并调用LockSupport. park( this)方法挂起自己。
- 当一个线程调用release(int arg)方法时会尝试使用tryRelease 操作释放资源,这里是设置状态变量state 的值,然后调用LockSupport.unpark(thread)方法激活AQS 队列里面被阻塞的一个线程(thread) 。被激活的线程则使用tryAcquire 尝试,看当前状态变量state的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还是会被放入AQS 队列并被挂起。
比如独占锁ReentrantLock 的实现, 当一个线程获取了ReentrantLock的锁后,在AQS内部首先使用CAS 操作把state状态值从0变为1 ,然后设置当前锁的持有者为当前线程;当该线程再次获取锁时发现它就是锁的持有者,则会把状态值从l 变为2 ,也就是设置重入次数;而当另外一个线程获取锁时发现自己并不是该锁的持有者就会被放入AQS 阻塞队列后起。
共享方式获取锁和释放锁
/**
以共享模式获取,忽略中断。通过至少一次调用tryacquiresred来实现,成功返回。
否则,线程排队,可能反复阻塞和解阻塞,调用tryacquiresred,直到成功。
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/**
以共享模式获取,如果中断则终止。首先检查中断状态,然后至少调用一次tryacquiresred,成功返回。
否则,线程会排队,可能会反复阻塞和解阻塞,调用tryacquiresred,直到成功或线程被中断。
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
/**
尝试以共享模式获取,如果被中断将终止,如果给定超时超时将失败。
首先检查中断状态,然后至少调用一次tryacquiresred,成功返回。
否则,线程会排队,可能会反复阻塞和解阻塞,并调用tryacquiresred,直到成功或线程被中断或超时过去。
*/
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
/**
以共享模式释放资源。
通过解除一个或多个线程的阻塞来实现,如果tryreleasshared返回true。
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
- 当线程调用acquireShared(int arg) 获取共享资源时,会首先使用tryAcquireShared尝试获取资源, 具体是设置状态变量state 的值,成功则直接返回,失败则将当前线程封装为类型为Node . SHARED 的Node 节点后插入到AQS 阻塞队列的尾部,并使用LockSupport. park( this)方法挂起自己。
- )当一个线程调用releaseShared(int a电)时会尝试使用tryReleaseShared 操作释放资源,这里是设置状态变量state 的值,然后使用LockSupport.unpark ( thread )激活AQS 队列里面被阻塞的一个线程(thread) 。被激活的线程则使用tryReleaseShared 查看当前状态变量state 的值是否能满足自己的需要,满足则该线程被撤活,然后继续向下运行,否则还是会被放入AQS 队列并被挂起。
比如Semaphore 信号量, 当一个线程通过acquire 方法获取信号量时,会首先看当前信号量个数是否满足需要, 不满足则把当前线程放入阻塞队列,如果满足则通过自旋CAS 获取信号量。
ConditionObject
AQS内部类ConditionObject,可以结合锁实现线程同步。
ConditionObject 可以直接访问AQS 对象内部的变量,比如state 状态值和AQS 队列
ConditionObject 是条件变量, 每个条件变量对应一个条件队列(单向链表队列),其用来存放调用条件变量的await 方法后被阻塞的线程, 这个条件队列的头、尾元素分别为firstWaiter 和lastWaiter 。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** 条件队列第一个等待节点 */
private transient Node firstWaiter;
/** 条件队列最后一个等待节点 */
private transient Node lastWaiter;
/** 构造函数,获取ConditionObject实例 */
public ConditionObject() { }
/** 向等待添加一个新的等待节点 */
private Node addConditionWaiter() {
Node t = lastWaiter;
// 判断最后一个等待节点线程是否被取消
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
/**
删除和传输节点,直到命中不是取消状态的一个节点或null。
从信号中分离出来,部分是为了鼓励编译器在没有等待的情况下内联。
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**删除和传输所有节点。*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
/**从条件队列中取消已取消的等待节点的链接。仅在持有锁是调用*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
// public methods
/**将等待时间最长的线程(如果存在)从该条件的等待队列移动到拥有锁的等待队列。*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
// 将条件队队列头元素移动到AQS队列
doSignal(first);
}
/**将所有线程从此条件的等待队列移动到所属锁的等待队列。*/
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
/**实现不可中断条件等待。
* 1、保存getState返回的锁状态。
* 2、以保存的状态作为参数调用release,如果失败则抛出IllegalMonitorStateException。
* 3、一直阻塞直到被唤醒
* 4、通过调用带有保存状态参数的acquire的特殊版本重新获取。 */
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
/*
* For interruptible waits, we need to track whether to throw
* InterruptedException, if interrupted while blocked on
* condition, versus reinterrupt current thread, if
* interrupted while blocked waiting to re-acquire.
*/
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;
/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
/**
* Throws InterruptedException, reinterrupts current thread, or
* does nothing, depending on mode.
*/
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
/**
实现可中断条件等待。
1、如果当前线程被中断,抛出InterruptedException。
2、保存getState返回的锁状态。
3、以保存的状态作为参数调用release,如果失败则抛出IllegalMonitorStateException。
4、阻塞,直到有信号或中断。
5、通过调用带有保存状态参数的acquire的特殊版本重新获取。
6、如果在第4步中阻塞时被中断,则抛出InterruptedException。
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 创建新的节点并插入条件队列末尾
Node node = addConditionWaiter();
// 释放当前线程持有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 调用park方法阻塞挂起当前线程
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/**
带有等待时间的await方式
*/
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
/**
带有等待时间条件的await方式
*/
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
/**
带有等待时间条件的await方式
*/
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
// support for instrumentation
/**
* Returns true if this condition was created by the given
* synchronization object.
*
* @return {@code true} if owned
*/
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
}
/**
* Queries whether any threads are waiting on this condition.
* Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
*
* @return {@code true} if there are any waiting threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}
/**
* Returns an estimate of the number of threads waiting on
* this condition.
* Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
*
* @return the estimated number of waiting threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}
/**
* Returns a collection containing those threads that may be
* waiting on this Condition.
* Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
*
* @return the collection of threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}
ReentrantLock
ReentrantLock 的底层是使用AQS 实现的可重入独占锁。在这里A QS 状态值为0 表示当前锁空闲,为大于等于l 的值则说明该锁己经被占用。
该锁内部有公平与非公平实现, 默认情况下是非公平的实现。另外,由于该锁是独占锁,所以某时只有一个线程可以获取该锁。
ReentrantLock构造方法
// 非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
// 传入true公平锁,false为非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
由以上类图知道,NonfairSync、FairSync继承Sync,Sync继承AQS
abstract static class Sync extends AbstractQueuedSynchronizer
static final class NonfairSync extends Sync
static final class FairSync extends Sync
lock
- 当一个线程调用该方法时,说明该线程希望获取该锁。如果锁当前没有被其他线程占用并且当前线程之前没有获取过该锁,则当前线程会获取到该锁,然后设置当前锁的拥有者为当前线程, 并设置AQS 的state值为1,然后直接返回。
- 如果当前线程之前己经获取过该锁,则这次只是简单地把AQS 的state值加1 后返回。
- 如果该锁己经被其他线程持有,则调用该方法的线程会被放入AQS 队列后阻塞挂起。
非公平锁lock实现
final void lock() {
// 默认AQS 的状态值为0,所以第一个调用Lock的线程会通过CAS 设置状态值为1, CAS 成功则表示当前线程获取到了锁,
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 调用AQS的acquire方法
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
// 调用父类Sync的nonfairTryAcquire方法
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 当前状态值
if (c == 0) {
// 如果当前值为0,第一次获取锁,尝试进行CAS操作获取锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 当前线程是锁的持有者
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 当前锁不可用且当前线程不是锁持有线程,返回false
return false;
}
公平锁lock实现
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 当前锁的状态
if (c == 0) {
// 公平性策略
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 当前线程是锁的持有者
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 锁不可用且当前线程不是锁的持有者
return false;
}
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
//如果h==t 则说明当前队列为空,直接返回false。
//不相等且head下一个节点也不为空,说明AQS队列第一个等待节点,该节点线程不是当前线程
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
lockInterruptibly
该方法与lock()方法类似,它的不同在于,它对中断进行响应,就是当前线程在调用该方法时,如果其他线程调用了当前线程的interrupt方法, 则当前线程会抛出InterruptedException 异常, 然后返回。
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 当前线程被中断,抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取资源
if (!tryAcquire(arg))
// 调用AQS可被中断的方法
doAcquireInterruptibly(arg);
}
trylock
尝试获取锁,如果当前该锁没有被其他线程持有,则当前线程获取该锁井返回true,
否则返回fal se 。注意,该方法不会引起当前线程阻塞。(直接调用sync.nonfairTryAcquire(1)返回结果,而不是调用AQS的acquire()方法,所有不会阻塞)
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
trylock(long timeout,TimeUnit unit)
尝试获取锁,与tryLock ()的不同之处在于,它设置了超时时间,如果超时时间到,没有获取到该锁则返回fal se 。
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
// 调用AQS的tryAcquireNanos
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
unlock
尝试释放锁,如果当前线程持有该锁, 则调用该方法会让该线程对该线程持有的AQS
状态值减l , 如果减去l 后当前状态值为0 ,则当前线程会释放该锁, 否则仅仅减I 而己。
如果当前线程没有持有该锁而调用了该方法则会抛出Ill ega !MonitorStateException 异常。
public void unlock() {
sync.release(1); # 调用release方法,tryRelease 实现
}
protected final boolean tryRelease(int releases) {
// 当前state -1
int c = getState() - releases;
// 如果当前线程不是锁的持有者抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// c==0 完全释放锁,清空锁的持有者
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// c!=0 重入锁,重入次数-1
setState(c);
return free;
}
读写锁 ReentrantReadWriteLock
ReentrantReadWriteLock 采用读写分离的策略,允许多个线程可以同时获取读锁。
读写锁的内部维护了一个ReadLock 和一个WriteLock ,它们依赖Sync 实现具体功能。而Sync 继承自AQS ,并且也提供了公平和非公平的实现。下面只介绍非公平的读写锁实现。
我们知道AQS 中只维护了一个state 状态,ReentrantReadWriteLock 地使用state 的高16 位表示读状态,也就是获取到读锁的次数;使用低16位表示获取到写锁的线程的可重入次数。
static final int SHARED_SHIFT = 16;
// 共享锁(读锁)状态单位值
static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 65536
// 共享锁线程最大数量
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 65535 0xffff
// 排它(写锁)锁掩码
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 1111 1111 1111 1111
/** 返回的读锁线程数 */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 返回的写锁可重入个数 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
// 第一个获取到读锁的线程
private transient Thread firstReader = null;
// 第一个获取到读锁的线程的可重入次数
private transient int firstReaderHoldCount;
// 用来记录最后一个获取读锁的线程获取读锁的可重入次数。
private transient HoldCounter cachedHoldCounter;
// 用于每个线程读取保持计数的计数器。作为线程本地维护;缓存在cachedHoldCounter
static final class HoldCounter {
int count = 0;
// 线程id(不使用引用,避免产生垃圾)
final long tid = getThreadId(Thread.currentThread()); // 调用unsafe获取线程Id
}
// 当前线程持有的可重入读锁的数量。
// 仅在构造函数和readObject中初始化。
// 当线程的读保持计数下降到0时删除。
private transient ThreadLocalHoldCounter readHolds;
// ThreadLoca!HoId Counter 继承了ThreadLocal ,因而initialValue 方法返
// 回一个Hold C ounter 对象。
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
写锁(WriteLock)的获取与释放
lock()
- 写锁是个独占锁,某时只有一个线程可以获取该锁。
- 如果当前没有线程获取到读锁和写锁, 则当前线程可以获取到写锁然后返回。
- 如果当前己经有线程获取到读锁和写锁,则当前请求写锁的线程会被阻塞挂起。
写锁是可重入锁,如果当前线程己经获取了该锁,再次获取后把可重入次数加1后直接返回。
public void lock() { sync.acquire(1); // 调用AQS的acquire方法 }
protected final boolean tryAcquire(int acquires) { /* * 流程: * 1. 如果读锁重入的次数不是0或者写锁的重入次数不是0,并且锁的持有者是其他线程,获取失败 * 2. 重入次数超过阈值,获取失败 * 3. 如果是重入锁(没有超过重入阈值)或者队列规则(读写锁规则)允许获得锁,那么返回成功 */ // 当前线程 Thread current = Thread.currentThread(); // 当前锁的状态, int c = getState(); // 写锁的个数 int w = exclusiveCount(c); // c & EXCLUSIVE_MASK if (c != 0) { // 不等于0表示锁已经被某些线程获取 // 写锁为0表示已经有线程获取读锁 或者 当前线程不是写锁的持有者,返回false if (w == 0 || current != getExclusiveOwnerThread()) return false; // 判断 写锁 可重入次数 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 设置重入次数 setState(c + acquires); return true; } // 第一个写线程获取锁 c==0 if (writerShouldBlock() || // 非公平锁为false ,公平锁使用AQS#hasQueuedPredecessors判断 !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
lockInterruptibly
类似于lock() 方法,它的不同之处在于, 它会对中断进行响应,也就是当其他线程调用了该线程的inte1TUpt() 方法中断了当前线程时, 当前线程会抛出异常InterruptedException异常。
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
tryLock
public boolean tryLock( ) { return sync.tryWriteLock(); }
final boolean tryWriteLock() { Thread current = Thread.currentThread(); // 当前锁state int c = getState(); if (c != 0) { // 写锁个数 int w = exclusiveCount(c); // c & EXCLUSIVE_MASK // 写锁为0(有线程获取了读锁)或者当前线程不是写锁持有线程,返回失败 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 写锁重入数量已经达到阈值,返回失败 if (w == MAX_COUNT) throw new Error("Maximum lock count exceeded"); } // c==0 锁没被持有,CAS操作 if (!compareAndSetState(c, c + 1)) return false; // 设置锁的持有者为当前线程 setExclusiveOwnerThread(current); return true; }
tryLock(long timeout, TimeUnit unit)
与ryLock的不同之处在于,多了超时时间参数,如果尝试获取写锁失败则会把当前线程挂起指定时间,待超时时间到后当前线程被激活,如果还是没有获取到写锁则返回false 。
另外,该方法会对中断进行响应, 也就是当其他线程调用了该线程的inte汀upt()方法中断了当前线程时,当前线程会抛出Inte盯uptedException 异常。
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
unlock
public void unlock() {
sync.release(1); // 调用AQS#release
}
protected final boolean tryRelease(int releases) {
// 当前线程是否拥有写锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException(); // 没有则抛出异常
// 更新state状态,state -1
int nextc = getState() - releases;
// 更新完state 写锁是否 ==0
boolean free = exclusiveCount(nextc) == 0;
// 如采写锁可重入值为0 则释放锁,否则只是简单地更新状态值
if (free)
setExclusiveOwnerThread(null); // 等于0 释放锁
// 更新state
setState(nextc);
return free;
}
读锁(ReadLock)的获取与释放
lock
获取读锁,如果当前没有其他线程持有写锁,则当前线程可以获取读锁, AQS 的状态值state 的高16 位的值会增加l ,然后方法返回。否则如果其他一个线程持有写锁, 则当前线程会被阻塞。
public void lock() {
sync.acquireShared(1); // 调用AQS#acquireShared
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) // tryAcquireShared 由子类实现
// 调用AQS#doAcquireShared
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
// 当前线程
Thread current = Thread.currentThread();
// state值
int c = getState();
// 如果有写锁 同时 当前线程不是锁的持有者,返回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 持有读锁的线程数
int r = sharedCount(c);
// 尝试获取锁, 多个读线程只有一个会成功,不成功的进入fullTryAcquireShared进行重试
if (!readerShouldBlock() &&
r < MAX_COUNT && // 读锁线程是否达到最大值
compareAndSetState(c, c + SHARED_UNIT)) { //CAS 操作将AQS 状态值的高16 位值增加l
// 第一个线程获取读锁
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 如采当前线程是第一个获取读锁的线程
firstReaderHoldCount++;
} else {
// 记录最后一个获取读锁的线程或记录其他线程读锁的可重入数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 类似tryAcquireShared ,但是是自旋获取
return fullTryAcquireShared(current);
}
unlock
public void unlock() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 由子类实现 tryReleaseShared
// 调用AQS#doReleaseShared
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {
// 当前线程
Thread current = Thread.currentThread();
// 当前线程是否为第一个获得读锁线程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
// 重入数减1
if (firstReaderHoldCount == 1)
firstReader = null; // 持有数为1则第一个读锁线程为Null
else
firstReaderHoldCount--;
} else {
// 最后一个读锁线程
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
// ,首先获取当前AQS 状态值并将其保存到变量c ,然后变量c 被减去一个读计数单位后使用CAS 操作更新AQS 状态值,
// 如果更新成功则查看当前AQS 状态值是否为0 ,为0 则说明当前己经没有读线程占用读锁,则tryReleaseShared 返回true
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
StampedLock
StampedLock 是并发包里面JDK8 版本新增的一个锁,该锁提供了三种模式的读写控
制,当调用获取锁的系列函数时,会返回一个long 型的变量,我们称之为戳记(stamp),这个戳记代表了锁的状态。
其中try系列获取锁的函数,当获取锁失败后会返回为0 的stamp 值。当调用释放锁和转换锁的方法时需要传入获取锁时返回的stamp 值。
StampedLock三种读写模式的锁:
写锁 - writeLock
StampedLock#writeLock排它锁,类似ReentrantReadWriteLock#WriteLock,但是不是可重入锁。请求锁成功会返回一个stamp变量表示锁的版本;释放锁时需要调用unlockWrite方法并传递获取锁时的stamp参数。另外tryWriteLock为非阻塞方法。
悲观读锁 - readLcok
StampedLock#readLock是一个共享锁,类似ReentrantReadWriteLock#ReadLock,但是不是可重入锁。具体操作数前会多数据进行加锁,这是对写多读少常见的一种考虑。获取锁成功是返回一个stamp变量表示当前锁的版本,释放锁时需要调用unlockRead方法并传入stamp参数。另外tryReadLock为非阻塞方法。
乐观读锁 - tryOptimisticRead
相对readLock,在操作数据前并没有通过CAS设置锁的状态,仅仅通过位运算测试。如果没有线程持有写锁,则返回0的stamp版本信息。获取stamp后在具体操作数据前还需要调用validate方法验证stamp是否已经不可用(期间是否有其他线程持有了writeLock),不可用返回0。tryOptimisticRead适合读多写少的场景,因为没有真正加锁,所有效率会比较高。
StampedLock还支持三种锁在一定条件下相互转换。
例如longConvertToWriteLock(long stamp) 期望把stamp标识的锁升级为写锁,成功会返回一个有效的stamp(以下情况)
- 当前锁已经是写锁
- 当前锁时readLock,并且没有其他线程是readLock模式
- 当前锁时tryOptimisticRead,并且当前writeLock可用
stampedLock都是不可重入锁,所以在获取锁后释放锁前不应该再调用会获取锁的操作,以避免造成调用线程被阻塞。
public class Point {
private double x, y;
private final StampedLock stampedLock = new StampedLock();
void move(double dataX, double dataY) {
// 获取写锁(排它锁)
long stamp = stampedLock.writeLock();
try {
x += dataX;
y += dataY;
} finally {
stampedLock.unlockWrite(stamp);
}
}
double distanceFromOrigin() {
// 尝试获取乐观读锁
long stamp = stampedLock.tryOptimisticRead();
// 将全部变量复制待方法栈中
double currentX = x, currentY = y;
// 检查是否期间有其他线程获取写锁
if (!stampedLock.validate(stamp)) {
// 如果有线程获取,则获取一个读锁
stamp = stampedLock.readLock();
try {
currentX = x;
currentY = y;
} finally {
stampedLock.unlockRead(stamp);
}
}
return Math.sqrt(Math.pow(currentX, 2) + Math.pow(currentY, 2));
}
void moveIfAtOrigin(double dataX, double dataY) {
// 获取悲观读锁
long stamp = stampedLock.readLock();
try {
while (BigDecimal.valueOf(dataX).equals(BigDecimal.ZERO) && BigDecimal.valueOf(dataY).equals(BigDecimal.ZERO)) {
// 将读锁升级为写锁
long writeStamp = stampedLock.tryConvertToWriteLock(stamp);
// 升级成功
if (writeStamp != 0L) {
stamp = writeStamp;
x = dataX;
y = dataY;
break;
} else {
stampedLock.unlockRead(stamp);
stamp = stampedLock.writeLock();
}
}
} finally {
stampedLock.unlockWrite(stamp);
}
}
}