AbstractQueuedSynchronizer抽象类(以下简称AQS)是整个java.util.concurrent包的核心,该包中的大多数同步器都是基于AQS来构建的。
AbstractQueuedSynchronizer继承了AbstractOwnableSynchronizer
public abstract class AbstractQueuedSynchronizerextends AbstractOwnableSynchronizerimplements java.io.Serializable
CountDownLatch
用来控制一个或者多个线程等待多个线程。
维护了一个计数器 cnt,每次调用 countDown() 方法会让计数器的值减 1,减到 0 的时候,那些因为调用 await() 方法而在等待的线程就会被唤醒。

public class CountdownLatchExample {public static void main(String[] args) throws InterruptedException {final int totalThread = 10;CountDownLatch countDownLatch = new CountDownLatch(totalThread);ExecutorService executorService = Executors.newCachedThreadPool();for (int i = 0; i < totalThread; i++) {executorService.execute(() -> {System.out.print("run..");countDownLatch.countDown();});}countDownLatch.await();System.out.println("end");executorService.shutdown();}}
run..run..run..run..run..run..run..run..run..run..end
CyclicBarrier
用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行。
和 CountdownLatch 相似,都是通过维护计数器来实现的。线程执行 await() 方法之后计数器会减 1,并进行等待,直到计数器为 0,所有调用 await() 方法而在等待的线程才能继续执行。
CyclicBarrier 和 CountdownLatch 的一个区别是,CyclicBarrier 的计数器通过调用 reset() 方法可以循环使用,所以它才叫做循环屏障。
CyclicBarrier 有两个构造函数,其中 parties 指示计数器的初始值,barrierAction 在所有线程都到达屏障的时候会执行一次。
public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction;}public CyclicBarrier(int parties) {this(parties, null);}

public class CyclicBarrierExample {public static void main(String[] args) {final int totalThread = 10;CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);ExecutorService executorService = Executors.newCachedThreadPool();for (int i = 0; i < totalThread; i++) {executorService.execute(() -> {System.out.print("before..");try {cyclicBarrier.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}System.out.print("after..");});}executorService.shutdown();}}
before..before..before..before..before..before..before..before..before..before..after..after..after..after..after..after..after..after..after..after..
Semaphore
Semaphore 类似于操作系统中的信号量,可以控制对互斥资源的访问线程数。
以下代码模拟了对某个服务的并发请求,每次只能有 3 个客户端同时访问,请求总数为 10。
public class SemaphoreExample {public static void main(String[] args) {final int clientCount = 3;final int totalRequestCount = 10;Semaphore semaphore = new Semaphore(clientCount);ExecutorService executorService = Executors.newCachedThreadPool();for (int i = 0; i < totalRequestCount; i++) {executorService.execute(()->{try {semaphore.acquire();System.out.print(semaphore.availablePermits() + " ");} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release();}});}executorService.shutdown();}}
2 1 2 2 2 2 2 1 2 2
同步状态
在AQS中维护了一个名叫state的字段,是由volatile修饰的,它就是所谓的同步状态:
private volatile int state;
与state字段的相关方法
| 方法名 | 描述 | 
|---|---|
| protected final int getState() | 获取state的值  | 
| protected final void setState(int newState) | 设置state的值  | 
protected final boolean compareAndSetState(int expect, int update) | 
使用CAS方式更新 state的值  | 
同步队列
AQS中维护了同步队列,这个队列的节点类Node被定义成了一个静态内部类,Node类中有一个Thread类型的字段,这表明每一个节点都代表一个线程。
static final class Node {static final Node SHARED = new Node();static final Node EXCLUSIVE = null;static final int CANCELLED = 1;static final int SIGNAL = -1;static final int CONDITION = -2;static final int PROPAGATE = -3;volatile int waitStatus;volatile Node prev;volatile Node next;volatile Thread thread;Node nextWaiter;}
任何想要获得锁的线程都需要来竞争同步状态,获得锁的线程可以继续业务流程的执行,而没有获得锁的线程会被放到一个FIFO的队列中去,等待再次竞争同步变量来获得锁,AbstractQueuedSynchronizer为每个没有获得锁的线程封装成一个Node再放到队列中去,AQS中定义一个头节点引用,一个尾节点引用。可以在队列上进行诸如插入和移除操作
private transient volatile Node head;private transient volatile Node tail;
静态变量 值 描述 Node.CANCELLED 1 节点对应的线程已经被取消了(我们后边详细会说线程如何被取消) Node.SIGNAL -1 表示后边的节点对应的线程处于等待状态 Node.CONDITION -2 表示节点在等待队列中(稍后会详细说什么是等待队列) Node.PROPAGATE -3 表示下一次共享式同步状态获取将被无条件的传播下去(稍后再说共享式同步状态的获取与释放时详细唠叨) 无 0 初始状态
- CANCELLED:值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
 - SIGNAL:值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
 - CONDITION:值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
 - PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
 - 0状态:值为0,代表初始化状态
 
独占模式Exclusive
在独占模式下,同一个时刻只能有一个线程获取到同步状态。其他同时去获取同步状态的线程会被包装成一个Node节点放到同步队列中,直到获取到同步状态的线程释放掉同步状态才能继续执行。在独占模式需要我们定义AQS的子类并且重写下边这些方法:
| 方法名 | 描述 | 
|---|---|
protected boolean tryAcquire(int arg) | 
独占式的获取同步状态,获取成功返回true,否则false | 
protected boolean tryRelease(int arg) | 
独占式的释放同步状态,释放成功返回true,否则false | 
protected boolean isHeldExclusively() | 
在独占模式下,如果当前线程已经获取到同步状态,则返回 true;其他情况则返回 false | 
AQS里定义了一些调用它们的方法,这些方法都是由public final修饰的:
| 方法名 | 描述 | 
|---|---|
void acquire(int arg) | 
独占式获取同步状态,如果获取成功则返回,如果失败则将当前线程包装成Node节点插入同步队列中。  | 
void acquireInterruptibly(int arg) | 
与上个方法意思相同,只不过一个线程在执行本方法过程中被别的线程中断,则抛出InterruptedException异常。  | 
boolean tryAcquireNanos(int arg, long nanos) | 
在上个方法的基础上加了超时限制,如果在给定时间内没有获取到同步状态,则返回false,否则返回 true。  | 
boolean release(int arg) | 
独占式的释放同步状态。 | 
acquire(int)
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
函数流程如下:
- tryAcquire()尝试直接去获取同步状态,如果返回true则结束;
 - addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
 - acquireQueued()使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
 - 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
tryAcquire(int)
此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。AQS是一个抽象类,在
tryAcquire只是抛出了异常,具体资源的获取/释放方式交由自定义同步器去实现 ```java protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } 
<a name="ZN4di"></a>#### addWaiter(Node)此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点```javaprivate Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode); //构造一个新节点Node pred = tail;if (pred != null) { //尾节点不为空,插入到队列最后node.prev = pred;if (compareAndSetTail(pred, node)) { //更新tail,并且把新节点插入到列表最后pred.next = node;return node;}}enq(node);return node;}private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { //tail节点为空,初始化队列if (compareAndSetHead(new Node())) //设置head节点tail = head;} else { //tail节点不为空,开始真正插入节点node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
acquireQueued
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor(); //获取前一个节点if (p == head && tryAcquire(arg)) { 前一个节点是头节点再次尝试获取同步状态setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
共享模式Share
| 方法名 | 描述 | 
|---|---|
protected int tryAcquireShared(int arg) | 
共享式的获取同步状态,获取成功返回true,否则false | 
protected boolean tryReleaseShared(int arg) | 
共享式的释放同步状态,释放成功返回true,否则false | 
这个方法会调用我们自定义的AQS子类中的tryAcquireShared方法去获取同步状态,只不过tryAcquireShared的返回值是一个int值,该值不小于0的时候表示获取同步状态成功,则acquireShared方法直接返回;如果该返回值小于0的时候,表示获取同步状态失败,则会把该线程包装成Node节点插入同步队列
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();}
doAcquireShared(int)
private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);//加入队列尾部boolean failed = true;//是否成功标志try {boolean interrupted = false;//等待过程中是否被中断过的标志for (;;) {final Node p = node.predecessor();//前驱if (p == head) {//如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的int r = tryAcquireShared(arg);//尝试获取资源if (r >= 0) {//成功setHeadAndPropagate(node, r);//将head指向自己,还有剩余资源可以再唤醒之后的线程p.next = null; // help GCif (interrupted)//如果等待过程中被打断过,此时将中断补上。selfInterrupt();failed = false;return;}}//判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
releaseShared(int)
释放掉资源后,唤醒后继。跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。例如,资源总量是13,A(5)和B(7)分别获取到资源并发运行,C(4)来时只剩1个资源就需要等待。A在运行过程中释放掉2个资源量,然后tryReleaseShared(2)返回true唤醒C,C一看只有3个仍不够继续等待;随后B又释放2个,tryReleaseShared(2)返回true唤醒C,C一看有5个够自己用了,然后C就可以跟A和B一起运行。而ReentrantReadWriteLock读锁的tryReleaseShared()只有在完全释放掉资源(state=0)才返回true,所以自定义同步器可以根据需要决定tryReleaseShared()的返回值。
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
模板模式
AQS框架,分离了构建同步器时的一系列关注点,它的所有操作都围绕着资源——同步状态(synchronization state)来展开,并替用户解决了如下问题:
- 资源是可以被同时访问?还是在同一时间只能被一个线程访问?(共享/独占功能)
 - 访问资源的线程如何进行并发管理?(等待队列)
 - 如果线程等不及资源了,如何从等待队列退出?(超时/中断)
 
这其实是一种典型的模板方法设计模式:父类(AQS框架)定义好骨架和内部操作细节,具体规则由子类去实现。
我们所熟知的ReentrantLock、CountDownLatch、CyclicBarrier等同步器,其实都是通过内部类实现了AQS框架暴露的API,以此实现各类同步器功能。这些同步器的主要区别其实就是对同步状态(synchronization state)的定义不同。AQS框架将剩下的一个问题留给用户:什么是资源?如何定义资源是否可以被访问?
我们来看下几个常见的同步器对这一问题的定义:
| 同步器 | 资源的定义 | 
|---|---|
| ReentrantLock | 资源表示独占锁。State为0表示锁可用;为1表示被占用;为N表示重入的次数 | 
| CountDownLatch | 资源表示倒数计数器。State为0表示计数器归零,所有线程都可以访问资源;为N表示计数器未归零,所有线程都需要阻塞。 | 
| Semaphore | 资源表示信号量或者令牌。State≤0表示没有令牌可用,所有线程都需要阻塞;大于0表示由令牌可用,线程每获取一个令牌,State减1,线程没释放一个令牌,State加1。 | 
| ReentrantReadWriteLock | 资源表示共享的读锁和独占的写锁。state逻辑上被分成两个16位的unsigned short,分别记录读锁被多少线程使用和写锁被重入的次数。 | 
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
  以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
  再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
  一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
Mutex(互斥锁)
Mutex是一个不可重入的互斥锁实现。锁资源(AQS里的state)只有两种状态:0表示未锁定,1表示锁定。
class Mutex implements Lock, java.io.Serializable {// 自定义同步器private static class Sync extends AbstractQueuedSynchronizer {// 判断是否锁定状态protected boolean isHeldExclusively() {return getState() == 1;}// 尝试获取资源,立即返回。成功则返回true,否则false。public boolean tryAcquire(int acquires) {assert acquires == 1; // 这里限定只能为1个量if (compareAndSetState(0, 1)) {//state为0才设置为1,不可重入!setExclusiveOwnerThread(Thread.currentThread());//设置为当前线程独占资源return true;}return false;}// 尝试释放资源,立即返回。成功则为true,否则false。protected boolean tryRelease(int releases) {assert releases == 1; // 限定为1个量if (getState() == 0)//既然来释放,那肯定就是已占有状态了。只是为了保险,多层判断!throw new IllegalMonitorStateException();setExclusiveOwnerThread(null);setState(0);//释放资源,放弃占有状态return true;}}// 真正同步类的实现都依赖继承于AQS的自定义同步器!private final Sync sync = new Sync();//lock<-->acquire。两者语义一样:获取资源,即便等待,直到成功才返回。public void lock() {sync.acquire(1);}//tryLock<-->tryAcquire。两者语义一样:尝试获取资源,要求立即返回。成功则为true,失败则为false。public boolean tryLock() {return sync.tryAcquire(1);}//unlock<-->release。两者语文一样:释放资源。public void unlock() {sync.release(1);}//锁是否占有状态public boolean isLocked() {return sync.isHeldExclusively();}}
