AQS概述
AQS是什么?
谈到Java并发编程,肯定离不开ReentrantLock 这个即可中断又可重入的独占锁,而ReentrantLock的底层实现又离不开AQS,AQS的实现又用到了CAS,CAS操作的实现又借用了一个UnSafe对象…..这里我们不跪扯这么多,我们先来理解一下这个AQS到底是什么东东,为什么说ReentrantLock、信号量等就是基于AQS实现的….
首先AQS全名AbstractQueuedSynchronizer,是[并发]容器J.U.C(java.util.concurrent)下locks包内的一个 抽象的[队列]式同步器(一个抽象父类) ,是除了java自带的synchronized 关键字之外的锁机制。
AQS的核心思想是什么?
AQS的核心思想是如果被请求的共享资源(锁)空闲,则将当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态;
如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
通俗来说,
AQS的核心思想是基于CLH双向虚拟队列,维护一个用volatile修饰的共享变量state,尝试加锁的线程通过CAS操作去改变state状态符,成功则获取锁成功,失败则将请求共享资源(锁)的线程封装成一个CLH双向锁队列的一个结点(Node)添加进等待队列,等待被唤醒。
如下图所示,
AQS维护了一个volatile关键字修饰的 int state 成员变量来表示同步状态, 和一个FIFO线程等待队列 来完成获取资源线程的排队工作 ,即多个线程争用state资源被阻塞的时候就会进入这个队列。
当线程需要加锁的时候首先会尝试获取锁,如果失败就将当前线程及等待状态等信息包装成一个node节点加入到同步队列sync queue里。而当持有锁的线程释放锁的时候,会唤醒队列中的后继节点绑定的线程。所以在当前节点为对头节点head的直接后继时会不断的循环尝试获取锁。
如果获取锁失败且自己不是队头节点(不是后继节点的话)就会阻塞自己直到自己被唤醒。
关于这个共享资源state状态信息的访问方式有如下三种:( 都是protected类型方法 )
- getState();
- setState();
- compareAndSetState(); //用cas的方式更新state的值
使用了AQS的类有哪些?
实现了AQS的锁有:自旋锁、互斥锁、读锁写锁、条件产量、信号量、栅栏都是AQS的衍生物。
我们常用的ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等并发类均是基于AQS来实现的。具体用法是在其内部定义一个内部类继承AQS,并实现其模板方法,来达到同步状态的管理。
且AQS 定义了两种资源共享方式: 不同的自定义的同步器争用共享资源的方式也不同
- Exclusive:独占,只有一个线程能执行,如ReentrantLock
Share:共享,多个线程可以同时执行,如Semaphore、CountDownLatch、ReadWriteLock,CyclicBarrier
小结AQS的特点有哪些?
综上所述,AQS有如下特点
AQS管理一个关于状态信息的单一整数state,该整数可以表现任何状态消息。比如,Semaphore
用它来表现剩余的许可数,ReentrantLock
用它来表现拥有它的线程已经请求了多少次锁;FutureTask
用它来表现任务的状态(尚未开始、运行、完成和取消)用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
- getState - 获取 state 状态
- setState - 设置 state 状态
- compareAndSetState - cas 机制设置 state 状态
- 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
- 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
- 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
自定义锁
有了上面对AQS的理解,现在我们来尝试一下自定义一个简单的不可重入锁试试,这样我们就可以通过实现一个简单的不可重入锁来加深我们对AQS的理解。
之前我们就说了,ReentrantLock 的底层实现用到了AQS,它是一个独占式的可重入锁,我们可以参考着它的实现来自定义一个属于我们自己的不可重入锁;
首先我们打开ReentrantLock的源码,发现里面有一个Sync的内部类继承了AQS同步器,而且ReentrantLock的大部分功能就是调用该内部类的方法来实现功能的,比如说ReentrantLock的lock方法,所以我们可以仿照ReentrantLock的实现方式来实现一个简单的不可重入的锁;
实现一个自定义不可重入锁
我们这里的目的是通过模仿ReentrantLock 来实现一个自定义的不可重入锁,达到加深对AQS的理解的目的,而且我们要实现的自定义不可重入锁的大部分功能都依赖于自定义的同步器类完成的。
ReentrantLock 中的同步器实现是定义在ReentrantLock 内部中作为其一个内部类的,为什么要做为一个内部类这是有说法的,所以我们这里也可以将其声明为内部类的形式。
我们自定义的同步器的设计是基于设计模式中的模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):
- 自定义同步器需要继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放)
- 将AQS思想组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。
我们为这里的自定义同步器在实现的时候只需要实现对共享资源state的获取和释放方式即可,至于具体线程等待队列的维护,AQS已经在顶层实现好了,这里我们先不去管它。
所以我们自定义同步器实现的时候主要实现下面几种方法:
1、isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
2、tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
3、tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
4、tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
5、tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
以我们熟悉的ReentrantLock为例,它是一个可重入的独占式锁:同步状态state初始化为0,表示未锁定状态,A线程执行lock()时,会调用tryAcquire()独占锁并用CAS操作将state+1,之后其他线程再想tryAcquire的时候就会失败,直到A线程执行unlock()将state=0为止,其他线程才有机会获取该锁。
A释放锁之前,自己也是可以重复获取此锁(state累加),这就是可重入的概念。注意:获取多少次锁就要释放多少次锁,保证state是能回到零态的。
以CountDownLatch为例,任务分N个子线程去执行,state就初始化 为N,表示支持N个线程并行执行,每个线程执行完之后执行一次countDown()方法,state就会用CAS操作减一。当N子线程全部执行完毕,state=0,会unpark()主调用线程,主调用线程就会从await()函数返回,继续之后的动作。
关于这个state我们先了解到这,现在继续使用AbstractQueuedSynchronizer自定义一个同步器来实现自定义不可重入锁! 不可重入是指加了锁之后连自己也会被挡住; 锁资源(state)只有两种状态:0:未被锁定;1:锁定。
同步类在实现时一般都将自定义同步器[sync]定义为内部类,供自己使用;而同步类自己(MyLock)则实现某个接口,对外服务。
同步器以及同步类的实现代码如下:
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class AQS {
public static void main(String[] args) {
MyLock lock=new MyLock();
new Thread(()->{
lock.lock();
try {
System.out.println("locking1....");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("unlocking1...");
lock.unlock();
}
},"t1").start();
new Thread(()->{
lock.lock();
try {
System.out.println("locking2....");
}finally {
System.out.println("unlocking2...");
lock.unlock();
}
},"t2").start();
}
}
/**
* 自定义不可重入锁
*/
class MyLock implements Lock{
//自定义同步器,实现AQS抽象类;我们重写或定义了四个,其他大部分都是父类中的方法
class MySync extends AbstractQueuedSynchronizer{
@Override //尝试获取锁 我们这里实现的是简单的不可重入锁,参数arg暂时用不到
protected boolean tryAcquire(int arg) {
//如何才能获取到锁呢?之前将AQS的时候说到过,AbstractQueuedSynchronizer里面有一个state,初始值是0
//我们可以这样设定:当state的值是0时表示没有获取锁,1表示获取到锁
//compareAndSetState方法是AQS中的方法
if (compareAndSetState(0,1)) {
//修改成功,表示加锁成功,为了保险起见,将持有锁的线程设置为当前线程(跟Monitor的Owner类似)
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
//不成功表示加锁失败,返回false;
return false;
}
@Override //释放锁
protected boolean tryRelease(int arg) {
//既然是释放锁,当然是将state的状态从1改为0咯
setExclusiveOwnerThread(null);
setState(0);
//但是由于state是被volatile修饰的,可以防止指令重排序,所以我们将修改state的语句放后面
return true;
}
@Override //是否持有独占锁
protected boolean isHeldExclusively() {
//state为1的时候就表示持有独占锁
return getState() == 1;
}
/**
* 返回一个条件变量
* @return
*/
public Condition newCondition(){
return new ConditionObject();
}
}
//创建一个自定义同步器对象,我们要实现的自定义不可重入锁的大部分功能都依赖于自定义的同步器类完成的
//也就是说我们实现Lock接口中需要实现的几个方法都需要依赖同步器对象完成
private MySync sync=new MySync();
@Override //不可打断锁,且若果加锁失败,会进入队列中进行等待
public void lock() {
sync.acquire(1);//调用该方法获取锁
}
@Override //可中断锁
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override //只进行一次尝试加锁,加锁成功则返回true,否则返回false;不会进入等待队列等待
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override //只进行一次尝试加锁,在规定时间内加锁成功则返回true,否则返回false;
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
@Override //解锁
public void unlock() {
sync.release(1);
}
@Override //创建条件变量
public Condition newCondition() {
return sync.newCondition();
}
}
模拟一个AQS自定义锁
下面我们不通过继承AQS、实现Lock的方式自定义锁,我们通过模拟AQS来实现一个自定义锁;
class MyNewLock {
//用volatile修饰是为了保证在线程用cas将state状态修改后其他线程立刻感知到
private volatile int state=0;
//表示当前持有锁的线程
private Thread lockHolder;
private ConcurrentLinkedQueue<Thread> queue=new ConcurrentLinkedQueue<>();
//用cas操作尝试修改state状态进行加锁
private boolean tryAquire(){
Thread currentHolder = Thread.currentThread();
int state = getState();
//state为0表示此时还没有线程获取到锁
if (state==0){
//使用cas操作尝试将state的值修改,保证原子性且只有一个线程能够获取到锁
if (compareAndSwapInt(0,1)){
//state状态修改成功就表示加锁成功,将锁的持有者设置为当前线程
setLockHolder(currentHolder);
return true;
}
}
//否则加锁失败
return false;
}
public void lock(){
//1、尝试获取锁,这里我们规定state属性值为0表示没有获取到锁,1表示获取到锁;同时使用CAS操作保证只有一个线程能够获取到锁
if (tryAquire()){
return;
}
Thread currentThread = Thread.currentThread();
//加锁失败就将其加入到队列中进行阻塞等待
queue.add(currentThread);
//2、没有获取到锁就得停留在当前方法
for (;;){
//2、但是这里在执行了park方法后就阻塞住不会再循环到这里了,所以要在unlock方法执行unPark方法,
// 这样其他线程在释放锁时就可以将这个被阻塞的线程唤醒,执行这个if判断;所以得有一个线程安全的队列
//去维护这个被park住了的线程引用
if (queue.peek()==currentThread&&tryAquire()){
queue.poll();//取出队头
return;
}
//1、但是若果一直空旋的话,浪费资源,我们可以让其阻塞,但是不能让其一直阻塞,所以上面要判断是否获取锁成功
LockSupport.park(currentThread);
}
//3、其他线程释放锁后,能够再次获取锁
}
public void unlock(){
Thread currentThread = Thread.currentThread();
if (currentThread!=lockHolder){
throw new RuntimeException("你不是线程锁的持有者");
}
int state = getState();
//如果要实现公平锁的话,要看看队列中是否有正在等待的线程
if((queue.size()==0||queue.peek()==currentThread)&& compareAndSwapInt(state,0)){
System.out.println(currentThread.getName()+" : 解锁成功!");
setLockHolder(null);
//唤醒队列中的等待线程
Thread peek = queue.peek();
if (peek!=null){
LockSupport.unpark(peek);//唤醒
}
}
}
//利用unSafe提供的原子操作进行替换oldValue与newValue
public final boolean compareAndSwapInt(int oldValue,int newValue){
return UNSAFE.compareAndSwapInt(this,stateOffset,oldValue,newValue);
}
//获取Unsafe对象(可以反射获取)
private static final Unsafe UNSAFE=Unsafe.getUnsafe();
//偏移量
private static final long stateOffset;
static {
try {
stateOffset=UNSAFE.objectFieldOffset(MyNewLock.class.getDeclaredField("state"));
}catch (Exception e){
throw new Error();
}
}
public int getState() {
return state;
}
public void setState(int state) {
this.state = state;
}
public Thread getLockHolder() {
return lockHolder;
}
public void setLockHolder(Thread lockHolder) {
this.lockHolder = lockHolder;
}
}
如果要实现可重入的特性,可以在state状态上做文章,每重入一次就将state的值自增一次,解锁就自减一次;进行了几次lock就得unlock几次。
我们的ReentrantLock就是这样实现了,值得注意的是,ReentrantLock并没有直接去继承AQS,而是在其内部声明了一个AQS内部实现类,事实上JUC包下的实现类大多数都是选择在内部实现一个AQS内部类而不是直接继承AQS。
至于为什么是在内部类使用AQS同步器呢?这是因为外部的抽象行为可能会跟AQS不一样,比如说外部类可能需要去实现某个锁的接口,可以是独占锁、共享锁,这是两种不同的行为,在不同功能上可能还有行为上的差别,比如说独占锁还可以分为公平锁与非公平锁,我们需要在一个外部类中去实现公平与非公平两种模式的锁或者是重入与非重入锁。最好的方式就是将其抽象到一个内部类中,且由外部类去引用,外部类共享内部类进行组合使用;
AQS原理小结
1、AQS内部维护了一个用volatile修饰的共享变量state,尝试加锁的线程会通过CAS操作去改变state状态符,成功则获取锁成功,失败则将请求共享资源(锁)的线程封装成一个CLH双向锁队列的一个结点(Node)添加进等待队列,等待被唤醒。
AQS部分源码
Node
AQS的内部类,双向队列的节点实现。
如果获取同步状态state失败时,会将当前线程及等待信息等构建成一个Node,将Node放到FIFO队列里,同时阻塞当前线程,当持有锁的线程将同步状态state释放时,会把FIFO队列中的首节点唤醒,使其获取同步状态state。
/**
双向队列节点
*/
static final class Node {
//表示是共享还是独占
static final Node SHARED = new Node();
//独占、排他锁的标识
static final Node EXCLUSIVE = null;
//这个标识表示失效。因为超时或者中断,节点会被设置成取消状态,被取消的节点不会参与到竞争中,且取消状态不会再改变
static final int CANCELLED = 1;
//带有该标识标识表示当前节点的后继节点处于等待状态需要被唤醒,在当前节点释放了同步状态或者取消,会通知后继节点
static final int SIGNAL = -1;
//节点在等待队列中,节点绑定的线程等待在condition中,当其他线程对该condition调用signal方法后,该节点会从等待队列中进入同步队列,获取同步状态
static final int CONDITION = -2;
//等待状态
volatile int waitStatus;
//双向队列前一个节点
volatile Node prev;
//双向队列下一个节点
volatile Node next;
//当前Node节点绑定的线程
volatile Thread thread;
//返回前驱节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
}
有了以上的大致了解,我们现在来看看FIFO的结构图
AbstractQueuedSynchronizer
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//双向队列的头节点
private transient volatile Node head;
//双向队列的尾节点
private transient volatile Node tail;
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
static final class Node {
//......
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
/*
tryAcquire方法:
尝试去获取锁,获取成功返回true,否则返回false。该方法由继承AQS的子类自己实现。采用了模板方法设计模式。
如: ReentrantLock的Sync内部类,Sync的子类:NonfairSync和FairSync里面才是真正实现tryAcquire方法的具体逻辑
*/
/*
addWaiter方法:将Node节点存储到同步队列中去
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
}