定义
队列同步器,构建锁和其他同步类的基础组件。它是JUC并发包下的核心基础组件。AQS解决了子类实现同步器时的大量细节问题,如获取同步状态,FIFO同步队列。AQS不仅减少了大量的实现工作,也不必处理在多个位置发生的竞争问题。在基于AQS构建的同步器中,只能在一个时刻发生阻塞,从而降低上下文切换造成的开销,提高了吞吐量,同时在设计AQS时考虑到了可伸缩性。
AQS内部方法
AQS本身实现的方法
- acquire(int arg): 独占式的获得锁,独占式获取同步状态都调用这个方法,通过子类实现的tryAcquire方法判断是否获得到了锁。
- acquireShared(int arg): 共享式的获得锁,例如读写锁,通过子类实现的tryAcquireShared方法判断是否获得锁
- release(int arg): 独占式的释放锁,通过子类的tryRelease方法判断是否释放了锁
releaseShared(int arg): 共享式的释放锁,通过子类的tryReleaseShared方法判断是否释放了锁
AQS子类实现的方法
tryAcquire(int arg):独占式的获取锁,返回值是boolean类型的,true代表获取锁,false代表获取失败。
- tryRelease(int arg):释放独占式同步状态,释放操作会唤醒其后继节点获取同步状态。
- tryAcquireShared(int arg):共享式的获取同步状态,返回大于0代表获取成功,否则就是获取失
- tryReleaseShared(int arg):共享式的释放同步状态。
- isHeldExclusively():判断当前的线程是否已经获取到了同步状态。
源码分析以及原理
AQS内部维护了一个Node节点类和一个ConditionObject类。
Node类维护了一个双向的FIFO队列,用来保存阻塞中的线程和获取同步状态的线程,ConditionObject对应的是Lock的等待通知机制。
waitstatus
Node节点类的volatile修饰的属性,代表同步节点的等待状态:
- initial:值为0,代初始状态值,表示当前没有线程获得锁
- cancelled:值为1 ,由于超时或者中断,节点被设置为取消状态。被设置为取消状态的节点不应该再去竞争锁,它应该一直是被取消的状态,不能变为其他的状态。处于这种状态的节点会被踢出队列,等待着被GC。
- signal:值为-1,后继节点的线程处于等待状态,当前节点的线程释放了同步状态或者取消,会通知后继节点去获取锁
- Condition:值为-2,节点在等待队列中,节点线程等待在Condition,当其它线程对Condition调用了signal方法,该节点会从等待队列移到同步队列中。
- propagate:值为-3,表示下一次共享式同步状态获取将会被无条件的被传播下去(读写锁中存在的状态,代表后续还有资源,可以多个线程同时拥有同步状态)
state
AQS类的volatile修饰的属性,代表锁的状态
acquire(int arg):
独占式的获得锁,尝试获得锁,获取失败,加入队列
public final void acquire(int arg){
if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSED),arg)){
//如果这个过程出现中断,那在整个过程之后再自我中断
selfInterrupt();
}
}
addWaiter(Node mode)
往同步队列中添加元素
private Node addWaiter(Node mode){
Node node = new Node(Thread.currentThread(),mode);
Node pred = tail;
if(pred != null){
node.prev = pred;
if(compareAndSetTail(pred,node)){//CAS设置尾节点保证线程的安全性
pred.next = node;
return node;
}
}
enq(node);//防止CAS操作失败,再次处理入队列
return node;
}
addWaiter()方法主要是创建一个节点,通过CAS的方法添加到队列的尾部。最后再次处理入队列是以防CAS操作失败。
enq(Node node)
防止CAS失败,重入队列
private Node enq(Node node){
for(;;){
Node t = tail;
if(t == null){
if(compareAndSetHead(new Node())){//创建一个新的节点并添加到队列初始化
tail - head;
}
}else{
node.prev = t;
if(compareAndSetTail(t,node)){
t.next = node;
return t;
}
}
}
}
acquireQueued(Node node,int arg)
当线程获取锁失败,并加入到同步队列中以后,就进入到一个自旋的状态,如果获取到这个状态就退出阻塞,否则一直阻塞。
final boolean acquireQueued(final Node node,int arg){
boolean faild = true;//是否获取了同步状态
try{
boolean interrupted = false;//获取状态过程中是否被中断
for(;;){
final Node p = node.predecessor();//前继节点
if(p == head && tryAcquire(arg)){//如果前继节点是头节点并且尝试获取锁成功
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
//判断自己是否已经阻塞了检查这个过程中是否被阻塞过
if(shouldParkAfterFailedAcquire(p,node) && parkAndCheckInterrupt()){
interrupted = true;
}
}
}finally{
if(failed){
cacelAcquire(node);
}
}
}
acquireQueued方法主要是让线程通过自旋去获取同步状态,因为使用的是FIFO队列,所以只有头结点的后继节点才有资格去获取同步状态,如果线程可以休息了就让该线程休息并记录过程中是否被中断过。当线程获取了同步状态就会从同步队列中移除这个节点。同时还会设置获取同步状态的这个节点(线程)为头节点。在设置头节点的时候不需要任何同步操作,因为独占锁中能获取同步状态的一定是同一线程。
shouldParkAfterFailedAcquire(Node node,Node node)
判断一个线程是否阻塞
private static boolean shouldPArkAfterFailedAcquire(Node pred,Node node){
int ws = pred.waitStatus;//获取节点的等待状态
if(ws == Node.SIGNAL){//如果是SIGNAL就代表当头节点释放后,这个节点就会去尝试获取状态
return true;//代表阻塞中
}
if(ws > 0){//代表前继节点放弃了
do {
node.prev = pred = pred.prev;//循环不停的往前找知道找到节点的状态是正常的
}while(pred.waitStatus > 0 );
pred.next = node;
}else{
compareAndSetWaitStatus(pred,ws,Node.SIGNAL);//通过CAS操作设置状态为SIGNAL
}
return false;
}
parkAndCheckInterrupt()
前面的方法是判断是否阻塞,而这个方法就是真正的执行阻塞的方法同时返回中断状态
private final boolean parkAndCheckInterupt(){
LockSupport.park(this);//阻塞当前线程
return Thread.interrupted();//返回中断状态
}
acquire整体流程
- 首先通过子类判断是否获取了锁,获取不到就加入到同步队列中
- 当线程在队列中通过不停的自旋去获取同步状态。如果获取到了锁,就把其设为同步队列的头部,否则在队列中不停的自旋来获取同步状态。
- 如果获取同步状态的过程中被中断过则自行调用interrupt方法自我中断
release(int arg)
独占式的释放锁
public final boolean release(int arg){
if(tryRelease(arg)){
Node h = head;//获取头结点
if(h != null && h.waitStatus != 0){//如果头结点不为空并且头结点的waitStatus不为0(0代表资源没有线程加锁)
unparkSuccessor(h);//唤醒下一个节点
}
return true;
}
return false;
}
unparkSuccessor(Node node)
唤醒后继节点获取同步状态
private void unparkSuccessor(Node node){
int ws = node.waitStatus;
if(ws < 0){
compareAndSetWaitStatus(node,ws,0);//通过CAS将头结点的状态设置为初始状态
}
Node s = node.next;
if(s == null || s.waitSattus > 0){//不存在或者已经取消
s = null;
for(Node t = tail;t != null && t != node; t = t.prev){
if(t.waitStatus <= 0){//从尾节点开始往前遍历,寻找离头结点最近的并且是正常状态的节点
s = t;
}
}
}
if(s != null){
lockSupport.unpark(s.thread);
}
}