1、简介
在高并发下,CAS会恶性空自旋造成大量CPU资源浪费,解决CAS恶性空自旋的方式之一为空间换时间,较为常见的方法有两种,分散热点和队列削峰。
JUC并发包是通过队列削峰的方案解决CAS性能问题,并提供了一个基于双向队列削峰的抽象基类AbstractQueuedSynchronizer(抽象同步器类,简称AQS)
AQS是构建锁或者其他同步组件的基础框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),JUC并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。它是JUC并发包中的核心基础组件。
有了AQS,构建线程协作类就容易多了。AQS的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态
AQS实现类
2、AQS三要素
2.1、同步状态
AQS使用一个int类型的成员变量state来表示同步状态。State的具体含义会根据具体实现类的不同而不同,比如在Semaphore里,它表示“剩余许可证的数量”,而CountDownLatch中,它表示“还需要倒数的数量”,ReentrantLcok中它表示锁的占有情况,包括可重入计数
2.2、控制线程抢锁和配合的FIFO队列
队列用来存放等待的线程,AQS是排队管理器,当多个线程争用同一把锁的时候,必须有排队机制将没有拿到锁的线程管理起来。当锁释放时,锁管理器就会挑选一个合适的线程来占用这个刚释放的锁。

2.3、期望协作类去实现的获取/释放等重要方法
这里的获取和释放方法是利用AQS的协作工具累里最重要的方法,是由协作类自己去实现的,并且含义各不相同。
获取方法
获取操作经常会依赖state变量,经常会阻塞
在semaphore中,获取就是acquire方法,作用是获取一个许可证
在CountDownLatch里面,获取就是await方法,作用就是等待,直到倒数结束
释放方法
释放操作不会被阻塞
在Semaphore中,释放就是release方法,作用是释放一个许可证
在CountDownLatch里面,获取就是CountDown方法,作用是倒数一个数
3、AQS源码解析
3.1、AQS用法

3.2、AQS在COuntDownLatch中的应用

构造函数
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}Sync(int count) {setState(count);}protected final void setState(int newState) {state = newState;}
getCount()
public long getCount() {return sync.getCount();}int getCount() {return getState();}protected final int getState() {return state;}protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}
countDown()
public void countDown() {sync.releaseShared(1);}public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}//自旋和CAS倒数,倒数到0时返回trueprotected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}//唤醒阻塞线程private void doReleaseShared() {/** Ensure that a release propagates, even if there are other* in-progress acquires/releases. This proceeds in the usual* way of trying to unparkSuccessor of head if it needs* signal. But if it does not, status is set to PROPAGATE to* ensure that upon release, propagation continues.* Additionally, we must loop in case a new node is added* while we are doing this. Also, unlike other uses of* unparkSuccessor, we need to know if CAS to reset status* fails, if so rechecking.*/for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}
await()
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}//把当前线程放入阻塞队列,并使线程陷入阻塞状态private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}public static void park(Object blocker) {Thread t = Thread.currentThread();setBlocker(t, blocker);UNSAFE.park(false, 0L);setBlocker(t, null);}
3.3、AQS在Semaphore中的应用

acquire()
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}
3.4、AQS在ReentrantLock中的应用
unlock()
public void unlock() {sync.release(1);}public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}//唤醒线程private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
lock()
public void lock() {sync.lock();}FairSync:final void lock() {acquire(1);}public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
4、利用AQS实现自己的latch门栓
public class OneSHotLatch {private Sync sync = new Sync();public void await(){sync.acquireShared(0);}public void signal(){sync.releaseShared(0);}private class Sync extends AbstractQueuedSynchronizer{@Overrideprotected int tryAcquireShared(int arg) {return (getState() == 1) ? 1 : -1;}@Overrideprotected boolean tryReleaseShared(int arg) {setState(1);return true;}}public static void main(String[] args) throws InterruptedException {OneSHotLatch oneSHotLatch = new OneSHotLatch();ExecutorService executorService = Executors.newFixedThreadPool(10);Runnable runnable = () -> {System.out.println(Thread.currentThread().getName() + "尝试获取锁,获取失败则等待");oneSHotLatch.await();System.out.println(Thread.currentThread().getName() + "获取锁成功,继续运行");};IntStream.range(0,10).forEach(e -> executorService.submit(runnable));Thread.sleep(1000);oneSHotLatch.signal();new Thread(runnable).start();}}
