关于AQS这个概念,个人是最近才知道。本篇文章只是简单的整理一下知识内容。
简介
- AQS是
AbstractQueuedSynchronizer这个JDK源码中的抽象类名的缩写 - JDK8的API手册定义其为一个框架。说白了就是使用了模板方法模式,定义了一个同步器实现的算法步骤,算法具体实现,延迟到子类去实现
 ReentrantLock、Semaphore等类都是依靠这个框架(继承了这个类)来实现的AQS数据结构
- status:资源标识
 - CLH锁-队列:CLH是三个人名字的首字母,CLH锁时自旋锁
 

- 
资源共享的两种模式
 独占式(Exclusive):如ReentrantLock
共享模式(Share):如CountDonwLatch、Semaphore
Node数据结构
Node的队列中每个节点的数据结构,每个节点Node代表的就是一个等待或占用线程
/*** Wait queue node class.** <p>The wait queue is a variant of a "CLH" (Craig, Landin, and* Hagersten) lock queue. CLH locks are normally used for* spinlocks. We instead use them for blocking synchronizers, but* use the same basic tactic of holding some of the control* information about a thread in the predecessor of its node. A* "status" field in each node keeps track of whether a thread* should block. A node is signalled when its predecessor* releases. Each node of the queue otherwise serves as a* specific-notification-style monitor holding a single waiting* thread. The status field does NOT control whether threads are* granted locks etc though. A thread may try to acquire if it is* first in the queue. But being first does not guarantee success;* it only gives the right to contend. So the currently released* contender thread may need to rewait.** <p>To enqueue into a CLH lock, you atomically splice it in as new* tail. To dequeue, you just set the head field.* <pre>* +------+ prev +-----+ +-----+* head | | <---- | | <---- | | tail* +------+ +-----+ +-----+* </pre>** Node的类的这段注释就解释了为啥叫CLH锁队列。* 因为新Node进入队列时使用CLH锁的方式(自旋锁的方式)***/static final class Node {/*** Marker to indicate a node is waiting in shared mode* 标记一个节点(一个线程)在共享模式下等待** */static final Node SHARED = new Node();/*** Marker to indicate a node is waiting in exclusive mode* 标记一个节点在独占模式下等待* */static final Node EXCLUSIVE = null;/*** waitStatus value to indicate thread has cancelled** 表示该结点已被取消** */static final int CANCELLED = 1;/**** waitStatus value to indicate successor's thread needs unparking* 标记后继的节点需要被唤醒* */static final int SIGNAL = -1;/**** waitStatus value to indicate thread is waiting on condition* 表示该结点等待某个条件** */static final int CONDITION = -2;/*** waitStatus value to indicate the next acquireShared should* unconditionally propagate* waitStatus的值,表示有资源可用,* 新head结点需要继续唤醒后继结点* (共享模式下,多线程并发释放资源,* 而head唤醒其后继结点后,需要把多出来的资源留给后面的结点;设置新的head结点时,会继续唤醒其后继结点)***/static final int PROPAGATE = -3;/****/volatile int waitStatus;/** 前驱指针 */volatile Node prev;/** 后继指针 */volatile Node next;/*** Node所代表的线程实例**/volatile Thread thread;/**** 要么是标记共享模式还是独占模式* 要么就是记录下一个等待条件的Node*/Node nextWaiter;/*** 是否共享模式*/final boolean isShared() {return nextWaiter == SHARED;}/*** 前节点获取* @return the predecessor of this node*/final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}Node() { // Used to establish initial head or SHARED marker}/*** 这里 第二的Node就是指明是 共享模式还是独占模式* @param thread* @param mode*/Node(Thread thread, Node mode) { // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}}
为啥叫CLH队列?
因为这个队列新增节点是通过CLH锁(一种自旋锁)的方式去入队列的。
Node类的几种waitStatus

| 状态 | 状态值 | 描述 | 
|---|---|---|
| INITAL | 0 | 初始化状态 | 
| CANCELLED | 1 | 当前节点已取消。 | 
| SIGNAL | -1 | 后继节点需要被唤醒 | 
| CONDITION | -2 | 节点等待一个Condition | 
| PROPAGATE | -3 | 共享模式中标识节点可运行? 标识资源可用,需要唤醒后继节点  | 
- 额,额,额,先记录着,后面再具体理解
 
Node类的入队列流程
这里我就不贴代码了。可以看一下涛哥笔记 yyds好吧。
- 尝试直接CAS将节点插入队列
 - 失败的话循环CAS插入
 
AQS的主要方法解析
AQS使用设计模式中的模板方法模式,自然会留给子类去实现的一些方法。
isHeldExclusively():该线程是否正在独占资源。用到Condition时需要实现方法- 独占锁
 
acquire(int arg):独占式获取同步状态,如果当前线程获取成功则返回,否则加入等待队列(一致等待)acquireInterruptibly(int arg):独占式获取同步状态(同上),如果被打断直接抛异常tryAcquire(int arg):独占式获取同步状态(供开发者重写)tryAcquireNanos(int arg,long nanosTimeout):独占式获取同步状态,增加等待时间限制release(int arg):独占式释放同步状态,释放后将同步队列中第一个节点包含的线程唤醒tryRelease(int arg):独占式释放同步状态(供开发者重写)- 共享锁
 
类似
tryAcquire方法由开发者自己实现,但是抽象类中一般也实现这个方法,但是只是抛异常,这样做的目的:
- 避免子类把不需要的方法实现一遍
 
获取资源方法分析
总的来说是下面流程
- 线程A先获取尝试获取资源,成功后线程A执行
 - 获取失败,线程A转换为Node加入等待队列
 - 线程A的Node循环尝试获取资源
 - 在某次尝试获取失败后,发现可以中断线程了,进行中断操作
 
acquire(int arg)
/*** Acquires in exclusive mode, ignoring interrupts. Implemented* by invoking at least once {@link #tryAcquire},* returning on success. Otherwise the thread is queued, possibly* repeatedly blocking and unblocking, invoking {@link* #tryAcquire} until success. This method can be used* to implement method {@link Lock#lock}.** 是一个独占模式,忽略中断,* 通过至少一次的tryAcquire方法实现* 或者进入队列重复的阻塞和不阻塞, 不阻塞时调用tryAcquire 直到成功**** @param arg the acquire argument. This value is conveyed to* {@link #tryAcquire} but is otherwise uninterpreted and* can represent anything you like.*/public final void acquire(int arg) {//首次try没成功, 入队列,之后又调用acquireQueued方法。if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
acquireQueued(Node node,int arg)
- 循环试着去拿资源
 每次失败后,会看看是否可以直接Waiting
/*** Acquires in exclusive uninterruptible mode for thread already in* queue. Used by condition wait methods as well as acquire.** 以独占和不可中断的模式** @param node the node* @param arg the acquire argument* @return {@code true} if interrupted while waiting*/final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {// 获取节点的前节点final Node p = node.predecessor();//若前节点是头节点,再试着获取一次if (p == head && tryAcquire(arg)) {//成功的话,我就是新王setHead(node);//前节点出队列p.next = null; // help GCfailed = false;return interrupted;}//shouldParkAfterFailedAcquire:判断是否可以让线程Wating// parkAndCheckInterrupt()并且尝试中断if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {// 循环结束仍然是失败,则取消请求if (failed)cancelAcquire(node);}}
shouldParkAfterFailedAcquire(Node pred, Node nowNode)
/*** Checks and updates status for a node that failed to acquire.* Returns true if thread should block. This is the main signal* control in all acquire loops. Requires that pred == node.prev.** 如果Node应该被阻塞** @param pred node's predecessor holding status* @param node the node* @return {@code true} if thread should block*/private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.** 前节点如果是标记了后继节点唤醒,则返回true**/return true;if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.* 前节点如果被取消了,则一直追溯到前面的不为取消的节点*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);//中间一段已取消的节点全部剔除队列pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.** 我们需要前面一个节点是 -1 状态, 否则我们不能 阻塞**/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
parkAndCheckInterrupt()
/*** Convenience method to park and then check if interrupted** @return {@code true} if interrupted*/private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
LockSupport.park: 线程进入Waiting状态
- LockSupport.unPark: 线程恢复
 
释放资源分析
/*** Releases in exclusive mode. Implemented by unblocking one or* more threads if {@link #tryRelease} returns true.* This method can be used to implement method {@link Lock#unlock}.** 尝试释放成功后,获取队列头节点。状态非初始化且不为空时* unpark后继节点** @param arg the release argument. This value is conveyed to* {@link #tryRelease} but is otherwise uninterpreted and* can represent anything you like.* @return the value returned from {@link #tryRelease}*/public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}/*** Wakes up node's successor, if one exists.** @param node the node*/private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;//当前节点小于0,将当前节点置为0if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.* 当前节点的下一个节点为 取消状态的节点时或不存在,* 从后向前遍历出一个 可以唤醒的节点***/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
- 有意思的是当可以释放资源节点的下一个节点无效时,使用从尾向前遍历找有效节点
 
参考文章
RedSpider《深入浅出多线程》- http://concurrent.redspider.group/article/02/11.html 
雨润涛哥的私人笔记 - https://lvtao0408.github.io/2020/03/08/AQS/
