关于AQS这个概念,个人是最近才知道。本篇文章只是简单的整理一下知识内容。
简介
- AQS是
AbstractQueuedSynchronizer
这个JDK源码中的抽象类名的缩写 - JDK8的API手册定义其为一个框架。说白了就是使用了模板方法模式,定义了一个同步器实现的算法步骤,算法具体实现,延迟到子类去实现
ReentrantLock
、Semaphore
等类都是依靠这个框架(继承了这个类)来实现的AQS数据结构
- status:资源标识
- CLH锁-队列:CLH是三个人名字的首字母,CLH锁时自旋锁
-
资源共享的两种模式
独占式(Exclusive):如ReentrantLock
共享模式(Share):如CountDonwLatch、Semaphore
Node数据结构
Node的队列中每个节点的数据结构,每个节点Node代表的就是一个等待或占用线程
/**
* Wait queue node class.
*
* <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
* Hagersten) lock queue. CLH locks are normally used for
* spinlocks. We instead use them for blocking synchronizers, but
* use the same basic tactic of holding some of the control
* information about a thread in the predecessor of its node. A
* "status" field in each node keeps track of whether a thread
* should block. A node is signalled when its predecessor
* releases. Each node of the queue otherwise serves as a
* specific-notification-style monitor holding a single waiting
* thread. The status field does NOT control whether threads are
* granted locks etc though. A thread may try to acquire if it is
* first in the queue. But being first does not guarantee success;
* it only gives the right to contend. So the currently released
* contender thread may need to rewait.
*
* <p>To enqueue into a CLH lock, you atomically splice it in as new
* tail. To dequeue, you just set the head field.
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
*
* Node的类的这段注释就解释了为啥叫CLH锁队列。
* 因为新Node进入队列时使用CLH锁的方式(自旋锁的方式)
*
*
*/
static final class Node {
/**
* Marker to indicate a node is waiting in shared mode
* 标记一个节点(一个线程)在共享模式下等待
*
* */
static final Node SHARED = new Node();
/**
* Marker to indicate a node is waiting in exclusive mode
* 标记一个节点在独占模式下等待
* */
static final Node EXCLUSIVE = null;
/**
* waitStatus value to indicate thread has cancelled
*
* 表示该结点已被取消
*
* */
static final int CANCELLED = 1;
/**
*
* waitStatus value to indicate successor's thread needs unparking
* 标记后继的节点需要被唤醒
* */
static final int SIGNAL = -1;
/**
*
* waitStatus value to indicate thread is waiting on condition
* 表示该结点等待某个条件
*
* */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
* waitStatus的值,表示有资源可用,
* 新head结点需要继续唤醒后继结点
* (共享模式下,多线程并发释放资源,
* 而head唤醒其后继结点后,需要把多出来的资源留给后面的结点;设置新的head结点时,会继续唤醒其后继结点)
*
*
*/
static final int PROPAGATE = -3;
/**
*
*/
volatile int waitStatus;
/** 前驱指针 */
volatile Node prev;
/** 后继指针 */
volatile Node next;
/**
* Node所代表的线程实例
*
*/
volatile Thread thread;
/**
*
* 要么是标记共享模式还是独占模式
* 要么就是记录下一个等待条件的Node
*/
Node nextWaiter;
/**
* 是否共享模式
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 前节点获取
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
/**
* 这里 第二的Node就是指明是 共享模式还是独占模式
* @param thread
* @param mode
*/
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
为啥叫CLH队列?
因为这个队列新增节点是通过CLH锁(一种自旋锁)的方式去入队列的。
Node类的几种waitStatus
状态 | 状态值 | 描述 |
---|---|---|
INITAL | 0 | 初始化状态 |
CANCELLED | 1 | 当前节点已取消。 |
SIGNAL | -1 | 后继节点需要被唤醒 |
CONDITION | -2 | 节点等待一个Condition |
PROPAGATE | -3 | 共享模式中标识节点可运行? 标识资源可用,需要唤醒后继节点 |
- 额,额,额,先记录着,后面再具体理解
Node类的入队列流程
这里我就不贴代码了。可以看一下涛哥笔记 yyds好吧。
- 尝试直接CAS将节点插入队列
- 失败的话循环CAS插入
AQS的主要方法解析
AQS使用设计模式中的模板方法模式,自然会留给子类去实现的一些方法。
isHeldExclusively()
:该线程是否正在独占资源。用到Condition时需要实现方法- 独占锁
acquire(int arg)
:独占式获取同步状态,如果当前线程获取成功则返回,否则加入等待队列(一致等待)acquireInterruptibly(int arg)
:独占式获取同步状态(同上),如果被打断直接抛异常tryAcquire(int arg)
:独占式获取同步状态(供开发者重写)tryAcquireNanos(int arg,long nanosTimeout)
:独占式获取同步状态,增加等待时间限制release(int arg)
:独占式释放同步状态,释放后将同步队列中第一个节点包含的线程唤醒tryRelease(int arg)
:独占式释放同步状态(供开发者重写)- 共享锁
类似
tryAcquire方法由开发者自己实现,但是抽象类中一般也实现这个方法,但是只是抛异常,这样做的目的:
- 避免子类把不需要的方法实现一遍
获取资源方法分析
总的来说是下面流程
- 线程A先获取尝试获取资源,成功后线程A执行
- 获取失败,线程A转换为Node加入等待队列
- 线程A的Node循环尝试获取资源
- 在某次尝试获取失败后,发现可以中断线程了,进行中断操作
acquire(int arg)
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* 是一个独占模式,忽略中断,
* 通过至少一次的tryAcquire方法实现
* 或者进入队列重复的阻塞和不阻塞, 不阻塞时调用tryAcquire 直到成功
*
*
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
//首次try没成功, 入队列,之后又调用acquireQueued方法。
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquireQueued(Node node,int arg)
- 循环试着去拿资源
每次失败后,会看看是否可以直接Waiting
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* 以独占和不可中断的模式
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取节点的前节点
final Node p = node.predecessor();
//若前节点是头节点,再试着获取一次
if (p == head && tryAcquire(arg)) {
//成功的话,我就是新王
setHead(node);
//前节点出队列
p.next = null; // help GC
failed = false;
return interrupted;
}
//shouldParkAfterFailedAcquire:判断是否可以让线程Wating
// parkAndCheckInterrupt()并且尝试中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 循环结束仍然是失败,则取消请求
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire(Node pred, Node nowNode)
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* 如果Node应该被阻塞
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*
* 前节点如果是标记了后继节点唤醒,则返回true
*
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
* 前节点如果被取消了,则一直追溯到前面的不为取消的节点
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//中间一段已取消的节点全部剔除队列
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*
* 我们需要前面一个节点是 -1 状态, 否则我们不能 阻塞
*
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt()
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
LockSupport.park: 线程进入Waiting状态
- LockSupport.unPark: 线程恢复
释放资源分析
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* 尝试释放成功后,获取队列头节点。状态非初始化且不为空时
* unpark后继节点
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
//当前节点小于0,将当前节点置为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
* 当前节点的下一个节点为 取消状态的节点时或不存在,
* 从后向前遍历出一个 可以唤醒的节点
*
*
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
- 有意思的是当可以释放资源节点的下一个节点无效时,使用从尾向前遍历找有效节点
参考文章
RedSpider《深入浅出多线程》- http://concurrent.redspider.group/article/02/11.html
雨润涛哥的私人笔记 - https://lvtao0408.github.io/2020/03/08/AQS/