1、简介

在高并发下,CAS会恶性空自旋造成大量CPU资源浪费,解决CAS恶性空自旋的方式之一为空间换时间,较为常见的方法有两种,分散热点和队列削峰
JUC并发包是通过队列削峰的方案解决CAS性能问题,并提供了一个基于双向队列削峰的抽象基类AbstractQueuedSynchronizer(抽象同步器类,简称AQS)

AQS是构建锁或者其他同步组件的基础框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),JUC并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。它是JUC并发包中的核心基础组件。

有了AQS,构建线程协作类就容易多了。AQS的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态

AQS实现类
image.png

2、AQS三要素

2.1、同步状态

AQS使用一个int类型的成员变量state来表示同步状态。State的具体含义会根据具体实现类的不同而不同,比如在Semaphore里,它表示“剩余许可证的数量”,而CountDownLatch中,它表示“还需要倒数的数量”,ReentrantLcok中它表示锁的占有情况,包括可重入计数

2.2、控制线程抢锁和配合的FIFO队列

队列用来存放等待的线程,AQS是排队管理器,当多个线程争用同一把锁的时候,必须有排队机制将没有拿到锁的线程管理起来。当锁释放时,锁管理器就会挑选一个合适的线程来占用这个刚释放的锁。

image.png

2.3、期望协作类去实现的获取/释放等重要方法

这里的获取和释放方法是利用AQS的协作工具累里最重要的方法,是由协作类自己去实现的,并且含义各不相同。

获取方法

获取操作经常会依赖state变量,经常会阻塞

  • 在semaphore中,获取就是acquire方法,作用是获取一个许可证

  • 在CountDownLatch里面,获取就是await方法,作用就是等待,直到倒数结束

释放方法

释放操作不会被阻塞

  • 在Semaphore中,释放就是release方法,作用是释放一个许可证

  • 在CountDownLatch里面,获取就是CountDown方法,作用是倒数一个数

3、AQS源码解析

3.1、AQS用法

image.png

3.2、AQS在COuntDownLatch中的应用

image.png

构造函数

  1. public CountDownLatch(int count) {
  2. if (count < 0) throw new IllegalArgumentException("count < 0");
  3. this.sync = new Sync(count);
  4. }
  5. Sync(int count) {
  6. setState(count);
  7. }
  8. protected final void setState(int newState) {
  9. state = newState;
  10. }

getCount()

  1. public long getCount() {
  2. return sync.getCount();
  3. }
  4. int getCount() {
  5. return getState();
  6. }
  7. protected final int getState() {
  8. return state;
  9. }
  10. protected boolean tryReleaseShared(int releases) {
  11. // Decrement count; signal when transition to zero
  12. for (;;) {
  13. int c = getState();
  14. if (c == 0)
  15. return false;
  16. int nextc = c-1;
  17. if (compareAndSetState(c, nextc))
  18. return nextc == 0;
  19. }
  20. }

countDown()

  1. public void countDown() {
  2. sync.releaseShared(1);
  3. }
  4. public final boolean releaseShared(int arg) {
  5. if (tryReleaseShared(arg)) {
  6. doReleaseShared();
  7. return true;
  8. }
  9. return false;
  10. }
  11. //自旋和CAS倒数,倒数到0时返回true
  12. protected boolean tryReleaseShared(int releases) {
  13. // Decrement count; signal when transition to zero
  14. for (;;) {
  15. int c = getState();
  16. if (c == 0)
  17. return false;
  18. int nextc = c-1;
  19. if (compareAndSetState(c, nextc))
  20. return nextc == 0;
  21. }
  22. }
  23. //唤醒阻塞线程
  24. private void doReleaseShared() {
  25. /*
  26. * Ensure that a release propagates, even if there are other
  27. * in-progress acquires/releases. This proceeds in the usual
  28. * way of trying to unparkSuccessor of head if it needs
  29. * signal. But if it does not, status is set to PROPAGATE to
  30. * ensure that upon release, propagation continues.
  31. * Additionally, we must loop in case a new node is added
  32. * while we are doing this. Also, unlike other uses of
  33. * unparkSuccessor, we need to know if CAS to reset status
  34. * fails, if so rechecking.
  35. */
  36. for (;;) {
  37. Node h = head;
  38. if (h != null && h != tail) {
  39. int ws = h.waitStatus;
  40. if (ws == Node.SIGNAL) {
  41. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  42. continue; // loop to recheck cases
  43. unparkSuccessor(h);
  44. }
  45. else if (ws == 0 &&
  46. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  47. continue; // loop on failed CAS
  48. }
  49. if (h == head) // loop if head changed
  50. break;
  51. }
  52. }

await()

  1. public void await() throws InterruptedException {
  2. sync.acquireSharedInterruptibly(1);
  3. }
  4. public final void acquireSharedInterruptibly(int arg)
  5. throws InterruptedException {
  6. if (Thread.interrupted())
  7. throw new InterruptedException();
  8. if (tryAcquireShared(arg) < 0)
  9. doAcquireSharedInterruptibly(arg);
  10. }
  11. protected int tryAcquireShared(int acquires) {
  12. return (getState() == 0) ? 1 : -1;
  13. }
  14. //把当前线程放入阻塞队列,并使线程陷入阻塞状态
  15. private void doAcquireSharedInterruptibly(int arg)
  16. throws InterruptedException {
  17. final Node node = addWaiter(Node.SHARED);
  18. boolean failed = true;
  19. try {
  20. for (;;) {
  21. final Node p = node.predecessor();
  22. if (p == head) {
  23. int r = tryAcquireShared(arg);
  24. if (r >= 0) {
  25. setHeadAndPropagate(node, r);
  26. p.next = null; // help GC
  27. failed = false;
  28. return;
  29. }
  30. }
  31. if (shouldParkAfterFailedAcquire(p, node) &&
  32. parkAndCheckInterrupt())
  33. throw new InterruptedException();
  34. }
  35. } finally {
  36. if (failed)
  37. cancelAcquire(node);
  38. }
  39. }
  40. private final boolean parkAndCheckInterrupt() {
  41. LockSupport.park(this);
  42. return Thread.interrupted();
  43. }
  44. public static void park(Object blocker) {
  45. Thread t = Thread.currentThread();
  46. setBlocker(t, blocker);
  47. UNSAFE.park(false, 0L);
  48. setBlocker(t, null);
  49. }

3.3、AQS在Semaphore中的应用

image.png

acquire()

  1. public void acquire() throws InterruptedException {
  2. sync.acquireSharedInterruptibly(1);
  3. }
  4. public final void acquireSharedInterruptibly(int arg)
  5. throws InterruptedException {
  6. if (Thread.interrupted())
  7. throw new InterruptedException();
  8. if (tryAcquireShared(arg) < 0)
  9. doAcquireSharedInterruptibly(arg);
  10. }
  11. protected int tryAcquireShared(int acquires) {
  12. for (;;) {
  13. if (hasQueuedPredecessors())
  14. return -1;
  15. int available = getState();
  16. int remaining = available - acquires;
  17. if (remaining < 0 ||
  18. compareAndSetState(available, remaining))
  19. return remaining;
  20. }
  21. }
  22. private void doAcquireSharedInterruptibly(int arg)
  23. throws InterruptedException {
  24. final Node node = addWaiter(Node.SHARED);
  25. boolean failed = true;
  26. try {
  27. for (;;) {
  28. final Node p = node.predecessor();
  29. if (p == head) {
  30. int r = tryAcquireShared(arg);
  31. if (r >= 0) {
  32. setHeadAndPropagate(node, r);
  33. p.next = null; // help GC
  34. failed = false;
  35. return;
  36. }
  37. }
  38. if (shouldParkAfterFailedAcquire(p, node) &&
  39. parkAndCheckInterrupt())
  40. throw new InterruptedException();
  41. }
  42. } finally {
  43. if (failed)
  44. cancelAcquire(node);
  45. }
  46. }

3.4、AQS在ReentrantLock中的应用

image.png

unlock()

  1. public void unlock() {
  2. sync.release(1);
  3. }
  4. public final boolean release(int arg) {
  5. if (tryRelease(arg)) {
  6. Node h = head;
  7. if (h != null && h.waitStatus != 0)
  8. unparkSuccessor(h);
  9. return true;
  10. }
  11. return false;
  12. }
  13. protected final boolean tryRelease(int releases) {
  14. int c = getState() - releases;
  15. if (Thread.currentThread() != getExclusiveOwnerThread())
  16. throw new IllegalMonitorStateException();
  17. boolean free = false;
  18. if (c == 0) {
  19. free = true;
  20. setExclusiveOwnerThread(null);
  21. }
  22. setState(c);
  23. return free;
  24. }
  25. //唤醒线程
  26. private void unparkSuccessor(Node node) {
  27. /*
  28. * If status is negative (i.e., possibly needing signal) try
  29. * to clear in anticipation of signalling. It is OK if this
  30. * fails or if status is changed by waiting thread.
  31. */
  32. int ws = node.waitStatus;
  33. if (ws < 0)
  34. compareAndSetWaitStatus(node, ws, 0);
  35. /*
  36. * Thread to unpark is held in successor, which is normally
  37. * just the next node. But if cancelled or apparently null,
  38. * traverse backwards from tail to find the actual
  39. * non-cancelled successor.
  40. */
  41. Node s = node.next;
  42. if (s == null || s.waitStatus > 0) {
  43. s = null;
  44. for (Node t = tail; t != null && t != node; t = t.prev)
  45. if (t.waitStatus <= 0)
  46. s = t;
  47. }
  48. if (s != null)
  49. LockSupport.unpark(s.thread);
  50. }

lock()

  1. public void lock() {
  2. sync.lock();
  3. }
  4. FairSync
  5. final void lock() {
  6. acquire(1);
  7. }
  8. public final void acquire(int arg) {
  9. if (!tryAcquire(arg) &&
  10. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  11. selfInterrupt();
  12. }
  13. protected final boolean tryAcquire(int acquires) {
  14. final Thread current = Thread.currentThread();
  15. int c = getState();
  16. if (c == 0) {
  17. if (!hasQueuedPredecessors() &&
  18. compareAndSetState(0, acquires)) {
  19. setExclusiveOwnerThread(current);
  20. return true;
  21. }
  22. }
  23. else if (current == getExclusiveOwnerThread()) {
  24. int nextc = c + acquires;
  25. if (nextc < 0)
  26. throw new Error("Maximum lock count exceeded");
  27. setState(nextc);
  28. return true;
  29. }
  30. return false;
  31. }
  32. }
  33. final boolean acquireQueued(final Node node, int arg) {
  34. boolean failed = true;
  35. try {
  36. boolean interrupted = false;
  37. for (;;) {
  38. final Node p = node.predecessor();
  39. if (p == head && tryAcquire(arg)) {
  40. setHead(node);
  41. p.next = null; // help GC
  42. failed = false;
  43. return interrupted;
  44. }
  45. if (shouldParkAfterFailedAcquire(p, node) &&
  46. parkAndCheckInterrupt())
  47. interrupted = true;
  48. }
  49. } finally {
  50. if (failed)
  51. cancelAcquire(node);
  52. }
  53. }

4、利用AQS实现自己的latch门栓

  1. public class OneSHotLatch {
  2. private Sync sync = new Sync();
  3. public void await(){
  4. sync.acquireShared(0);
  5. }
  6. public void signal(){
  7. sync.releaseShared(0);
  8. }
  9. private class Sync extends AbstractQueuedSynchronizer{
  10. @Override
  11. protected int tryAcquireShared(int arg) {
  12. return (getState() == 1) ? 1 : -1;
  13. }
  14. @Override
  15. protected boolean tryReleaseShared(int arg) {
  16. setState(1);
  17. return true;
  18. }
  19. }
  20. public static void main(String[] args) throws InterruptedException {
  21. OneSHotLatch oneSHotLatch = new OneSHotLatch();
  22. ExecutorService executorService = Executors.newFixedThreadPool(10);
  23. Runnable runnable = () -> {
  24. System.out.println(Thread.currentThread().getName() + "尝试获取锁,获取失败则等待");
  25. oneSHotLatch.await();
  26. System.out.println(Thread.currentThread().getName() + "获取锁成功,继续运行");
  27. };
  28. IntStream.range(0,10).forEach(e -> executorService.submit(runnable));
  29. Thread.sleep(1000);
  30. oneSHotLatch.signal();
  31. new Thread(runnable).start();
  32. }
  33. }

5、AQS资料

11-10 AQS补充材料(选修).pdf