1、简介
在高并发下,CAS会恶性空自旋造成大量CPU资源浪费,解决CAS恶性空自旋的方式之一为空间换时间,较为常见的方法有两种,分散热点和队列削峰。
JUC并发包是通过队列削峰的方案解决CAS性能问题,并提供了一个基于双向队列削峰的抽象基类AbstractQueuedSynchronizer(抽象同步器类,简称AQS)
AQS是构建锁或者其他同步组件的基础框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),JUC并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。它是JUC并发包中的核心基础组件。
有了AQS,构建线程协作类就容易多了。AQS的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态
AQS实现类
2、AQS三要素
2.1、同步状态
AQS使用一个int类型的成员变量state来表示同步状态。State的具体含义会根据具体实现类的不同而不同,比如在Semaphore里,它表示“剩余许可证的数量”,而CountDownLatch中,它表示“还需要倒数的数量”,ReentrantLcok中它表示锁的占有情况,包括可重入计数
2.2、控制线程抢锁和配合的FIFO队列
队列用来存放等待的线程,AQS是排队管理器,当多个线程争用同一把锁的时候,必须有排队机制将没有拿到锁的线程管理起来。当锁释放时,锁管理器就会挑选一个合适的线程来占用这个刚释放的锁。
2.3、期望协作类去实现的获取/释放等重要方法
这里的获取和释放方法是利用AQS的协作工具累里最重要的方法,是由协作类自己去实现的,并且含义各不相同。
获取方法
获取操作经常会依赖state变量,经常会阻塞
在semaphore中,获取就是acquire方法,作用是获取一个许可证
在CountDownLatch里面,获取就是await方法,作用就是等待,直到倒数结束
释放方法
释放操作不会被阻塞
在Semaphore中,释放就是release方法,作用是释放一个许可证
在CountDownLatch里面,获取就是CountDown方法,作用是倒数一个数
3、AQS源码解析
3.1、AQS用法
3.2、AQS在COuntDownLatch中的应用
构造函数
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
protected final void setState(int newState) {
state = newState;
}
getCount()
public long getCount() {
return sync.getCount();
}
int getCount() {
return getState();
}
protected final int getState() {
return state;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
countDown()
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
//自旋和CAS倒数,倒数到0时返回true
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
//唤醒阻塞线程
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
await()
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//把当前线程放入阻塞队列,并使线程陷入阻塞状态
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
3.3、AQS在Semaphore中的应用
acquire()
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3.4、AQS在ReentrantLock中的应用
unlock()
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
//唤醒线程
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
lock()
public void lock() {
sync.lock();
}
FairSync:
final void lock() {
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
4、利用AQS实现自己的latch门栓
public class OneSHotLatch {
private Sync sync = new Sync();
public void await(){
sync.acquireShared(0);
}
public void signal(){
sync.releaseShared(0);
}
private class Sync extends AbstractQueuedSynchronizer{
@Override
protected int tryAcquireShared(int arg) {
return (getState() == 1) ? 1 : -1;
}
@Override
protected boolean tryReleaseShared(int arg) {
setState(1);
return true;
}
}
public static void main(String[] args) throws InterruptedException {
OneSHotLatch oneSHotLatch = new OneSHotLatch();
ExecutorService executorService = Executors.newFixedThreadPool(10);
Runnable runnable = () -> {
System.out.println(Thread.currentThread().getName() + "尝试获取锁,获取失败则等待");
oneSHotLatch.await();
System.out.println(Thread.currentThread().getName() + "获取锁成功,继续运行");
};
IntStream.range(0,10).forEach(e -> executorService.submit(runnable));
Thread.sleep(1000);
oneSHotLatch.signal();
new Thread(runnable).start();
}
}