简介

AQS:AbstractQueuedSynchronizer
JAVA的几乎所有锁(除synchronized,LockSupport外)都是用AQS实现的,比如ReentrantLock,CountdownLatch,CyclicBarrier,Semaphore等;

可以说AQS的底层就是CAS+volatile

  1. 1. state变量
  2. 使用int类型的volatile变量维护同步状态(state)
  3. 围绕state提供锁的两种操作“获取”和“释放=0”; 读锁与写锁区分 65535= 2^16-1
  4. FutureTask用它来表示任务的状态
  5. 2. 内置的同步队列 CLH,双端双向列表
  6. FIFO队列存放阻塞的等待线程,来完成线程的排队执行;
  7. 封装成NodeNode维护一个prev引用和next引用,实现双向链表
  8. AQS维护两个指针,分别指向队列头部head和尾部tail
  9. 锁是面向使用者的,定义了用户调用的接口,隐藏了实现细节;
  10. AQS是锁的实现者,屏蔽了同步状态管理,线程的排队,等待唤醒的底层操作。
  11. 锁是面向使用者,AQS是锁的具体实现者
  12. 背后复杂的线程排队,线程阻塞/唤醒,如何保证线程安全,都由AQS为我们完成了

底层是一个双向链表—>cas来维护 ; int常量state volatile修饰

  1. 属性private volatile int state;同步状态

这个state在不同的子类实现中有不同的含义,但都是通过这个state来控制”同步”的行为.这个state通过CAS来改变.

  1. AQS维护着Node的双向链表作为等待队列,这个等待队列是”CLH”(Craig, Landin, and Hagersten)队列的变体Node是AQS的内部类,Node里面的属性装着线程volatile Thread thread,和一个状态volatile int waitStatus.各个线程去争抢的过程其实就是这个CLH队列的入列和出列,这个过程的关键操作都是通过CAS来实现的

    通过ReentrantLock的lock()方法,去阅读理解AQS

    AbstractQueuedSynchronizer的state属性对于ReentrantLock的意义:
    state表示线程持有锁的次数:
    state=0表示没有线程持有锁;
    state>0表示有一个线程持有锁,重入了state次.

起初我是看的JDK8的源码,但是它和JDK13上有一些细节上的区别,JDK13更好理解一些.我还记下了JDK8和JDK11在AQS实现上的区别,后来想想没啥意义,还容易记忆混乱,就干脆只记录JDK13的实现方式了.既然学习嘛,那就学新的~

JDK8和新版的JDK(以11为例)实现的细节上有些不同,整体思路是一样的:
CAS修改state和Node节点入队列时,JDK8通过unsafe获取变量的内存地址偏移量,和JDK11通过变量句柄指向变量的地址.
按我的理解,这俩效果是一样的,JDK11的效率应该更高一些
以state为例:
JDK8用到了stateOffset.(为了允许将来的优化没有用AtomicInteger),这个stateOffset可以理解为state的”指针”(地址),可以通过stateOffsetCAS改变state。

  1. long stateOffset=unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
  2. unsafe.compareAndSwapInt(this, stateOffset, expect, update);

JDK11用到了VarHandle,可以直接CAS改变state

  1. VarHandle STATE=MethodHandles.lookup()
  2. .findVarHandle(AbstractQueuedSynchronizer.class, "state", int.class);
  3. STATE.compareAndSet(this, expect, update)

Unsafe是实现CAS的核心类,Java无法直接访问底层操作系统,而是通过本地(native)方法来访问。Unsafe类提供了硬件级别的原子操作。

lock()的大致流程如下:

下面简称AbstractQueuedSynchronizer为AQS,AbstractOwnableSynchronizer为AOS
image.png

尝试阅读ReentrantLock的lock()方法

下面尝试着通过debugReentrantLock.lock()方法,去阅读源码.

  1. 首先随便写个方法,在lock.lock()处打断点,然后debug运行,追踪下去

    1. public static void main(String[] args) {
    2. ReentrantLock lock = new ReentrantLock();
    3. lock.lock();
    4. System.out.println("hahaha");
    5. lock.unlock();
    6. }
  2. 看到ReentrantLock的lock()方法

ReentrantLock.lock()方法注释的大致意思:
这个方法用于获得该锁;
如果该锁没有被线程持有,那么把锁的hold count设为1后直接返回;
如果当前线程已经持有该锁,则hold count加一,然后返回;
如果该锁被其他线程持有,那么当前线程将”阻塞”,直到获得锁,然后把hold count设为1后返回
这个lock hold count应该就是AQS的state

  1. /**
  2. * Acquires the lock.
  3. *
  4. * <p>Acquires the lock if it is not held by another thread and returns
  5. * immediately, setting the lock hold count to one.
  6. *
  7. * <p>If the current thread already holds the lock then the hold
  8. * count is incremented by one and the method returns immediately.
  9. *
  10. * <p>If the lock is held by another thread then the
  11. * current thread becomes disabled for thread scheduling
  12. * purposes and lies dormant until the lock has been acquired,
  13. * at which time the lock hold count is set to one.
  14. */
  15. public void lock() {
  16. sync.acquire(1);
  17. }
  1. 看到ReentrantLock.lock()其实是调用了sync.acquire(1),sync变量是Sync类型.Sync是何许人也?

    1. Sync,NonfairSync,FairSync都是ReentrantLock的内部类<br /> Sync 继承了 AbstractQueuedSynchronizer<br /> NonfairSync(非公平锁的实现) FairSync(公平锁的实现) 都继承了 Sync<br /> Syncacquire方法继承自AQS,自己没有实现:
    1. /**
    2. * Acquires in exclusive mode, ignoring interrupts. Implemented
    3. * by invoking at least once {@link #tryAcquire},
    4. * returning on success. Otherwise the thread is queued, possibly
    5. * repeatedly blocking and unblocking, invoking {@link
    6. * #tryAcquire} until success. This method can be used
    7. * to implement method {@link Lock#lock}.
    8. *
    9. * @param arg the acquire argument. This value is conveyed to
    10. * {@link #tryAcquire} but is otherwise uninterpreted and
    11. * can represent anything you like.
    12. */
    13. public final void acquire(int arg) {
    14. if (!tryAcquire(arg) &&
    15. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    16. selfInterrupt();
    17. }

    3.1 AQS的acquire中的tryAcquire是由子类实现的,
    这个tryAcquire是NonfairSync实现的,调用了Sync的nonfairTryAcquire方法,因为ReentrantLock是默认是非公平锁
    下面代码

    1. /**
    2. * Performs non-fair tryLock. tryAcquire is implemented in
    3. * subclasses, but both need nonfair try for trylock method.
    4. */
    5. @ReservedStackAccess
    6. final boolean nonfairTryAcquire(int acquires) {
    7. final Thread current = Thread.currentThread();
    8. /*
    9. getState()调用了AQS的方法,直接返回了state属性.
    10. 如果state等于0,则CAS设置stateOffset为acquires;
    11. 如果state!=0,则判断拥有锁的线程是不是当前线程,如果不是则返回false;如果是当前线程已经持有锁,则更新state值为state+acquire(可重入)
    12. 可见ReentrantLock把state作为是否锁定的判断标准,如果state=0则表示没有线程持有锁.如果state值>0,则表示已有线程持有了锁,并且持有了state次(可重入).
    13. */
    14. int c = getState();
    15. if (c == 0) {
    16. // compareAndSetState是AQS的方法,CAS修改state值
    17. if (compareAndSetState(0, acquires)) {
    18. //AQS继承了AbstractOwnableSynchronizer
    19. //设置状态后,则进入AbstractOwnableSynchronizer的setExclusiveOwnerThread方法.表示已经拿到锁.
    20. setExclusiveOwnerThread(current);
    21. return true;
    22. }
    23. }
    24. // 没拿到锁,则判断是否锁的持有者就是当前线程
    25. else if (current == getExclusiveOwnerThread()) {
    26. // 当前线程重入
    27. int nextc = c + acquires;
    28. if (nextc < 0) // overflow
    29. throw new Error("Maximum lock count exceeded");
    30. setState(nextc);
    31. return true;
    32. }
    33. return false;
    34. }

    3.2 如果tryAcquire没有成功获得锁,则把当前线程加入等待队列

  1. /**
  2. * Creates and enqueues node for current thread and given mode.
  3. *
  4. * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
  5. * @return the new node
  6. */
  7. private Node addWaiter(Node mode) {
  8. Node node = new Node(mode);
  9. // 自旋把前节点加入等待队列
  10. for (;;) {
  11. Node oldTail = tail;
  12. if (oldTail != null) {
  13. node.setPrevRelaxed(oldTail);
  14. if (compareAndSetTail(oldTail, node)) {
  15. oldTail.next = node;
  16. return node;
  17. }
  18. } else {
  19. initializeSyncQueue();
  20. }
  21. }
  22. }

然后调用acquireQueued方法

  1. /**
  2. * Acquires in exclusive uninterruptible mode for thread already in
  3. * queue. Used by condition wait methods as well as acquire.
  4. *
  5. * @param node the node
  6. * @param arg the acquire argument
  7. * @return {@code true} if interrupted while waiting
  8. */
  9. final boolean acquireQueued(final Node node, int arg) {
  10. boolean interrupted = false;
  11. try {
  12. for (;;) {
  13. final Node p = node.predecessor();
  14. // 如果上一个节点是头节点,则说明下一个就该当前线程了
  15. // 那就试着去拿一下锁,tryAcquire仍是调用的子类(Sync)的方法
  16. if (p == head && tryAcquire(arg)) {
  17. setHead(node);
  18. p.next = null; // help GC
  19. return interrupted;
  20. }
  21. // 判断是否需要park,需要的话就调用LockSupport的park阻塞线程
  22. if (shouldParkAfterFailedAcquire(p, node))
  23. interrupted |= parkAndCheckInterrupt();
  24. }
  25. } catch (Throwable t) {
  26. cancelAcquire(node);
  27. if (interrupted)
  28. selfInterrupt();
  29. throw t;
  30. }
  31. }

AQS的应用

  1. 在上一篇文章中我们手写了AQS和利用AQS实现了ReentrantLockReentrantReadWriteLock的,今天我们再来用自己的AQSjdkAQS来继续实现它的衍生物:计数器CountDownLatch、信号量Semaphone、回旋栅栏CyclicBarrier(不是AQS实现)。

计数器CountDownLatch

  1. 利用它可以实现类似计数器的功能。比如有一个任务A,它要等待其他任务执行完毕之后才能执行,或者说当其他任务执行到一定数量后才执行,此时就可以利用CountDownLatch来实现这种功能了。<br /> 计数器实际上就是一把共享锁,每个线程(读线程)执行完后将锁的readCount-1。直到减少到我们认为合理的值(一般是0),而这个减少的方法就是我们countDown()的释放锁的方法,每次释放锁都会-1,但此时并不会真正的释放整个读锁,释放的只是单个读线程的锁。所以此时await()方法实际上就是另外一个写线程去抢锁,只有等到整个读线程释放锁,另一个写线程(我们测试的是main线程)才能拿到锁,才会打断阻塞。

基于JDK的AQS实现

  1. /**
  2. * @author heian
  3. * @create 2020-03-07-9:08 下午
  4. * @description 计数器就是一把共享锁,只有当线程执行之后-1,直至0
  5. */
  6. public class MyCountDownLatch {
  7. private Sync sync;
  8. public MyCountDownLatch(int count){
  9. if (count<0){
  10. throw new IllegalArgumentException("参数传入有误");
  11. }
  12. this.sync= new Sync(count);
  13. }
  14. //阻塞 只有当readCount == 0,释放共享锁,类比ReentrantLock的lock拿到锁
  15. public void await() {
  16. sync.acquireShared(1);
  17. }
  18. //countDown 就是释放锁,并把state-1
  19. public void countDown() {
  20. sync.releaseShared(1);
  21. }
  22. class Sync extends AbstractQueuedSynchronizer{
  23. //开始存值
  24. public Sync(int count){
  25. setState(count);
  26. }
  27. //await 就是抢占共享锁,当state变为0的时候就表示锁释放,就会去唤醒main或者抢锁线程抢锁
  28. @Override
  29. protected int tryAcquireShared(int arg) {
  30. return getState()<=0 ? 1:-1;
  31. }
  32. //countDown 释放锁就是-1,直至减少到0,才会释放共享锁
  33. @Override
  34. protected boolean tryReleaseShared(int arg) {
  35. //System.out.println("打印队列长度" + getQueueLength());
  36. for (;;){
  37. int newNum = getState() - arg;
  38. if (compareAndSetState(getState(),newNum)){
  39. return newNum <= 0;
  40. }
  41. }
  42. }
  43. }
  44. }

基于我们的AQS实现

  1. /**
  2. * @author heian
  3. * @create 2020-03-07-10:53 下午
  4. * @description
  5. */
  6. public class MyCountDownLatch2 {
  7. private Sync sync;
  8. public MyCountDownLatch2(int count){
  9. if (count<0){
  10. throw new IllegalArgumentException("参数传入有误");
  11. }
  12. this.sync = new Sync(count);
  13. }
  14. class Sync extends MyAQS{
  15. public Sync(int count){
  16. readCount.compareAndSet(readCount.get(),count);
  17. }
  18. // await readCount<=0 >0 表示别的线程抢锁成功 <0 失败
  19. @Override
  20. public int tryLockShared(int acquires) {
  21. return readCount.get() <=0 ? 1:-1;
  22. }
  23. //countDown别的线程抢锁,只有当readCount ==0 才会抢成功
  24. @Override
  25. public boolean tryUnlockShared(int releases) {
  26. for (;;){
  27. int newNum = readCount.get() - releases;
  28. if (readCount.compareAndSet(readCount.get(),newNum)){
  29. return readCount.get()<=0;
  30. }
  31. }
  32. }
  33. }
  34. public void await() {
  35. //我的AQS这里arg 默认是1 就没传参输
  36. this.sync.lockShared();
  37. }
  38. public void countDown() {
  39. //我的AQS这里arg 默认是1 就没传参输
  40. this.sync.unlockShared();
  41. }
  42. }

测试代码

  1. public static void main(String[] args) {
  2. MyCountDownLatch countDownLatch = new MyCountDownLatch(5);
  3. IntStream.range(0,5).forEach(value -> {
  4. new Thread(() -> {
  5. LockSupport.parkNanos(1000000000*1L);
  6. System.out.println(Thread.currentThread().getName() + "结束");
  7. countDownLatch.countDown();
  8. }).start();
  9. });
  10. countDownLatch.await();
  11. System.out.println("main开始");
  12. }
  13. Thread-2结束
  14. Thread-3结束
  15. Thread-1结束
  16. Thread-4结束
  17. Thread-0结束
  18. main开始
  1. ![屏幕快照 2020-03-08 下午12.55.07.png](https://cdn.nlark.com/yuque/0/2020/png/771792/1583643327855-5e3087be-dd3a-4969-bc71-fd10466311a3.png#align=left&display=inline&height=327&margin=%5Bobject%20Object%5D&name=%E5%B1%8F%E5%B9%95%E5%BF%AB%E7%85%A7%202020-03-08%20%E4%B8%8B%E5%8D%8812.55.07.png&originHeight=654&originWidth=1190&size=99376&status=done&style=none&width=595)<br /> 需要说明的就是我们这里实现AQS的两个tryLockShared/tryUnlockShared 方法是<=0。它的语义就是说:因为我们这里仅仅开辟了5个读线程去调用了释放锁的方法,而释放锁的方法会调用tryUnlockShared方法,而我们设置好的readCount数量是5,只有当它一直减少至0或者<0(读线程数量大于5)的时候才会唤醒在队列中的写线程,我们这里是main线程,才会打断await的阻塞,唤醒它重新调用tryLockShared方法,而此时由于readCount已经<=0了,这时候会抢锁成功,于是乎便打断阻塞,便可以执行main线程了。(读线程不会放入队列)

信号量Semaphone

  1. Semaphore字面意思为信号量,Semaphore可以控同时访问的线程个数,以此来达到限流的作用。<br /> 其主要有两个常用的方法 acquire() release(),前者的语义是获得一个许可,其实说白了就是去抢锁,抢到了则readCount+1,并从队列中移除,没抢则该线程挂起,只有当别的读线程执行完调用了release()方法,被挂起的线程才有机会去抢锁。<br />![屏幕快照 2020-03-08 下午2.27.30.png](https://cdn.nlark.com/yuque/0/2020/png/771792/1583648877304-7a3254e1-4a7c-4c2d-8b64-96b0f3cb9ea0.png#align=left&display=inline&height=300&margin=%5Bobject%20Object%5D&name=%E5%B1%8F%E5%B9%95%E5%BF%AB%E7%85%A7%202020-03-08%20%E4%B8%8B%E5%8D%882.27.30.png&originHeight=600&originWidth=1046&size=79213&status=done&style=none&width=523)

基于JDK的AQS实现

  1. /**
  2. * @author heian
  3. * @create 2020-03-07-9:08 下午
  4. * @description 信号量 进来就拿锁,拿到了就+1,释放了就-1,共享锁的线程只允许最大连接permitCount个线程
  5. */
  6. public class MySemaphone {
  7. private Sync sync;
  8. private int permitCount;
  9. public MySemaphone(int count){
  10. if (count<0){
  11. throw new IllegalArgumentException("参数传入有误");
  12. }
  13. this.sync= new Sync();
  14. this.permitCount = count;
  15. }
  16. public void acquire() {
  17. sync.acquireShared(1);
  18. }
  19. public void release() {
  20. sync.releaseShared(1);
  21. }
  22. class Sync extends AbstractQueuedSynchronizer{
  23. @Override
  24. protected int tryAcquireShared(int arg) {
  25. for (;;){
  26. int currentReadCount = getState() + arg;
  27. if (currentReadCount<=permitCount){
  28. if (compareAndSetState(getState(),currentReadCount)){
  29. return 1;
  30. }
  31. }else {
  32. return -1;
  33. }
  34. }
  35. }
  36. @Override
  37. protected boolean tryReleaseShared(int arg) {
  38. //System.out.println("打印队列长度" + getQueueLength());
  39. for (;;){
  40. int remainNum = getState() - arg;
  41. return compareAndSetState(getState(),remainNum);
  42. }
  43. }
  44. }
  45. }

测试代码

  1. public static void main(String[] args) {
  2. MySemaphone semaphore = new MySemaphone(3);
  3. IntStream.range(0,5).forEach(value -> {
  4. new Thread(() -> {
  5. semaphore.acquire();
  6. System.out.println(Thread.currentThread().getName() + " 获得许可");
  7. LockSupport.parkNanos(1000000000*2L);
  8. System.out.println(Thread.currentThread().getName() + "释放许可");
  9. semaphore.release();
  10. }).start();
  11. });
  12. }
  13. Thread-0 获得许可
  14. Thread-2 获得许可
  15. Thread-1 获得许可
  16. Thread-0释放许可
  17. Thread-1释放许可
  18. Thread-2释放许可
  19. Thread-4 获得许可
  20. Thread-3 获得许可
  21. Thread-3释放许可
  22. Thread-4释放许可

回旋栅栏CyclicBarrier

  1. 回环栅栏:让一组线程等待至某个状态之后再全部同时执行,叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。叫做栅栏,描述所有线程被栅栏挡住了,当都达到时,一起跳过栅栏执行,我们可以把这个状态就叫做barrier。<br /> CyclicBarrier并不是ASQ实现的,因为AQS内部只有一个队列,而这个队列可以用来装所有进来的线程,但是我们还需要一个队列用来装正在运行的线程。而我们每次调用await方法都会去做一个自增操作,当自增操作count=许可数目,我们便启动这个栅栏,然后重置count,在来装下一轮。<br /> 可以用我们生活的例子举例,比如我们排队等公交,一辆公车只允许装在3个人,而此时 你有7个人,则会一开始装走3个下一辆在装走3个,但是最后只剩下一个因为不满足3个人一车的条件只能被阻塞在外面不允许运行该线程。<br /> 实现回旋栅栏的方式一般有两种,一个是基于ReentranLock,另外也可以基于syncronized。这里我已syncronized+notifyAll写出实现代码。这里代码演示就是每过一秒增加一个线程运行,只有当线程数达到3方可运行。

基于syncronized+notifyAll实现

  1. /**
  2. * @author heian
  3. * @create 2020-03-08-2:34 下午
  4. * @description 循环栅栏 实现原理
  5. */
  6. public class MyCyclicBarrier {
  7. private int initCount = 0;
  8. private int permitCount;
  9. private Object generation = new Object();
  10. public MyCyclicBarrier(int permitCount){
  11. this.permitCount = permitCount;
  12. }
  13. public void await() throws InterruptedException {
  14. synchronized (this){
  15. initCount++;
  16. Object currentG = generation;
  17. if (initCount == permitCount){
  18. //进入下一次计数
  19. nextGeneration();
  20. }else {
  21. //防止伪唤醒
  22. for (;;){
  23. this.wait();//释放锁
  24. //被更改了,则直接跳出阻塞
  25. if (currentG != generation){
  26. break;
  27. }
  28. }
  29. }
  30. }
  31. }
  32. public void nextGeneration(){
  33. synchronized (this){
  34. initCount =0;
  35. generation = new Object();
  36. notifyAll();
  37. }
  38. }
  39. }

测试代码

  1. public static void main(String[] args) {
  2. MyCyclicBarrier cyclicBarrier = new MyCyclicBarrier(3);
  3. IntStream.range(0,7).forEach(value -> {
  4. new Thread(() -> {
  5. try {
  6. cyclicBarrier.await ();
  7. System.out.println (Thread.currentThread ().getName () + "一起跳出栅栏");
  8. } catch (Exception e) {
  9. e.printStackTrace();
  10. }
  11. }).start();
  12. LockSupport.parkNanos(1000000000*1L);
  13. });
  14. }
  15. //少一个线程未跳出
  16. Thread-1一起跳出栅栏
  17. Thread-0一起跳出栅栏
  18. Thread-2一起跳出栅栏
  19. Thread-5一起跳出栅栏
  20. Thread-3一起跳出栅栏
  21. Thread-4一起跳出栅栏