1.AQS—锁的底层支持

AbstractQueuedSynchronizer 抽象同步队列简称AQS,它是实现同步器的基础组件,并发包中的锁的底层就是使用的AQS实现的,另外大多数的开发者可能永远都不会去使用AQS

image.png

由上图可以看得出 AQS 是一个先进先出的双向队列,其中内部通过节点 head tail 记录队尾和队首的元素

Node节点:

  1. 队列的元素的类型为 Node ,其中Node的thread 变量用来存储进入到AQS中的线程
  2. Node节点内部的 SHARED 用来标识该线程是获取共享资源时被阻塞挂起后放入到AQS队列中的
  3. Node节点内部的 EXCLUSIVE 用来标识线程获取独占资源时被挂起后放入AQS队列中的
  4. waitStatus 记录当前线程的状态,可以为 CANCELLED(线程被取消了),SIGNAL(线程需要被唤醒),CONDITION(线程在队列里面等待),PROPAGATE(释放共享资源后需要通知其他节点)
  5. pre 记录当前节点的前驱节点
  6. next 记录当前节点的后继节点

解读:
**

  1. 在AQS内部维持了一个单一的状态信息 State ,可以通过getState 和 setState 以及 compareAndSetState 函数修改其值,对于 ReentrantLock 的实现来说, state 可以用来表示当前线程获取锁的可重入次数,对于读写锁 ReentrantReadWriteLock 来说,state的高16位表示读状态,也就是获取该读锁的次数,低16位表示获取到写锁的线程的可重入次数,对于 semaphore 来说,state用来表示当前可用信号的个数,对于 CountDownlatch 来说,state用来表示计数器的当前的值。
  2. AQS内部有一个类 ConditionObject ,用来结合锁实现线程的同步,ConditionObject可以直接访问AQS对象内部的变量,比如 state 状态值和AQS队列,ConditionObject是条件变量,每个变量对应一个条件队列(单向链表队列),其用来存放调用条件变量的 await 方法后阻塞的线程的,如上图所示,这个条件队列的头尾元素分别位 firstWaiter 和 lastWaiter
  3. 对于AQS来说,线程同步的关机按就是对 状态值 state 进行操作。根据state是否属于一个线程,操作state的方式分为独占方式和共享方式,在独占方式下获取和释放资源使用的方法位:

    1. void acquire(int arg);
    2. void acquireInterruptibly(int arg);
    3. boolean relase(int arg);

    在共享方式下获取和释放资源的方法为:

    1. void acquireShared(int arg);
    2. void acquireSharedInterruptibly(int arg);
    3. boolean relaseShared(int arg);
  4. 使用独占的方式获取的资源是和具体的线程绑定的,也就是说如果一个线程获取到资源,就会标记这个线程获取到了,其他线程在尝试操作 state 获取资源的适合就会发现当前该资源不是自己持有的,就会在获取失败后被阻塞。比如独占锁 ReentrantLock 的实现,当一个线程获取到 lock 锁以后,在 AQS 内部会首先使用 CAS 操作把 state的 状态值从 0 变为 1,然后设置当前锁的持有者线程为当前线程,当线程再次获取锁时发现它就是锁的持有者,则会把状态值从 1 变成 2,也就是设置可重入的次数,而当另外一个线程获取锁时发现自己不是该锁的持有者就会被放入 AQS阻塞队列后挂起

  5. 对应共享方式的资源和具体线程是不相关的,当多个线程去请求资源时通过CAS方法竞争获取资源,当一个线程获取到资源后,另外一个线程再去获取时如果当前资源还能满足它的需求,则当前线程只需要使用CAS方法进行获取即可。比如 Semaphore 信号量,当一个线程通过 acquire() 方法获取信号量时,首先会去看当前信号量个数是否满足需要,不满足则把当前线程放入阻塞队列中,如果满足则通过自旋CAS获取信号量

1.1独占方式下获取和释放资源的流程

1.当一个线程调用 acquire(int arg)方法获取独占资源的时候,会首先使用 tryAcquire() 方法尝试获取资源,具体是设置状态变量 state 的值,成功后则直接返回,失败则将当前线程封装为 Node.EXCLUSIVE 的Node 节点后插入到 AQS阻塞队列的尾部,并调用LockSupport.park(this)方法挂起自己

  1. // ReentrantLock lock.lock()
  2. public void lock() {
  3. sync.lock();
  4. }
  5. //继续向下,查看非公平锁
  6. final void lock() {
  7. //尝试获取锁,将state的状态值设置为1
  8. if (compareAndSetState(0, 1))
  9. setExclusiveOwnerThread(Thread.currentThread());
  10. else
  11. //尝试失败则
  12. acquire(1);
  13. }
  14. public final void acquire(int arg) {
  15. //再次尝试获取锁,如果失败返回 false并将自己封装为 Node.EXCLUSIVE 的 Node 加入到 AQS队列尾部
  16. if (!tryAcquire(arg) &&
  17. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  18. selfInterrupt();
  19. }
  20. //继续 tryAcquire(1)
  21. protected final boolean tryAcquire(int acquires) {
  22. return nonfairTryAcquire(acquires);
  23. }
  24. //nonfairTryAcquire
  25. final boolean nonfairTryAcquire(int acquires) {
  26. final Thread current = Thread.currentThread();
  27. int c = getState();
  28. //如果当前线程第一次获取锁,且锁没有被其他线程获取,则设置state 的值 为 0
  29. if (c == 0) {
  30. if (compareAndSetState(0, acquires)) {
  31. //设置当前线程为锁的持有者
  32. //也就是将 AbstractOwnableSynchronizer 类中的 private transient Thread exclusiveOwnerThread; 进行赋值
  33. setExclusiveOwnerThread(current);
  34. return true;
  35. }
  36. }
  37. //如果发现是 c!= 0 锁被持有,且是当前线程持有的,则说明当前线程再次获取锁,设置可重入的次数(也就是当前线程多少次获取了该锁)
  38. else if (current == getExclusiveOwnerThread()) {
  39. int nextc = c + acquires;
  40. if (nextc < 0) // overflow
  41. throw new Error("Maximum lock count exceeded");
  42. //设置可重入的次数
  43. setState(nextc);
  44. return true;
  45. }
  46. return false;
  47. }

2.当一个线程调用 relase(int arg)方法时会尝试使用 tryRelase 操作释放资源,这里是设置状态变量 state的值,然后调用 LockSupport.unpark(thread) 方法激活AQS队列里面阻塞的一个线程(thread),被激活的线程则使用 tryAcquire 尝试,看当前状态变量 state 的值是否能满足自己的需要,满足则该线程被机会,然后继续向下运行,否则还是会被加入AQS队列并被挂起

  1. public void unlock() {
  2. sync.release(1);
  3. }
  4. //继续向下,查看非公平锁的具体实现
  5. public final boolean release(int arg) {
  6. if (tryRelease(arg)) {
  7. Node h = head;
  8. if (h != null && h.waitStatus != 0)
  9. unparkSuccessor(h);
  10. return true;
  11. }
  12. return false;
  13. }
  14. //在 Sync 类中 可以看到
  15. protected final boolean tryRelease(int releases) {
  16. //这里就说明获取几次锁就需要释放几次锁
  17. //当一个线程可重入的获取锁的时候,状态值 会被 + 1,每次获取一次锁就会执行 int nextc = c + acquires; 这个操作
  18. //同样在释放锁的时候 int c = getState() - releases; 执行 - 1 操作
  19. //只有当 c == 0 的时候才说明当前线程释放锁成功,不然会返回 false,继续持有锁
  20. int c = getState() - releases;
  21. if (Thread.currentThread() != getExclusiveOwnerThread())
  22. throw new IllegalMonitorStateException();
  23. boolean free = false;
  24. if (c == 0) {
  25. free = true;
  26. setExclusiveOwnerThread(null);
  27. }
  28. setState(c);
  29. return free;
  30. }

需要注意的是,AQS类并没有提供可用的 tryAcquire 和 tryRelase 方法,正如 AQS 是锁阻塞赛和同步器的基础框架一样,tryAcquire 和 tryRelase 需要由具体的子来实现。子类在实现 tryAcquire 和 reyRelase 时要根据具体的场景使用 CAS 算法尝试修改 state的状态值,成功则返回 true,否则则返回 false。子类还需要定义,在调用acquire 和 relase方法时state状态值的增减代表什么含义
比如几次自 AQS 实现的独占锁 ReentrantLock,定义status 为0的时候表示锁空闲,为1表示锁以及被占用,在重写tryAcquire时,在内部需要使用 CAS 算法查看当前state是否为0,如果为0则使用CAS设置为1,并设置当前锁的持有者为当前线程,而后返回true,如果CAS失败则返回false
比如继承自AQS实现的独占锁在实现tryRelease时,在内部需要使用CAS算法把当前 state的值从 1修改为0,并设置当前锁的持有这为 null ,然后返回true,如果CAS失败则返回false

1.2共享方式下获取和释放资源

获取:

1.当线程调用 acquireShared(int arg)获取共享资源时,会首先使用tryAcquireShared尝试获取资源,具体是设置状态变量 state的值,成功则直接返回,失败将当前线程封装为类型为 Node.SHARED 的 Node 节点后插入到AQS阻塞队列的尾部,并使用LockSupport.park(this)方法挂起自己

  1. //查看 ReentrantReadWriteLock 中读锁就是操作共享资源的
  2. public void lock() {
  3. sync.acquireShared(1);
  4. }
  5. public final void acquireShared(int arg) {
  6. if (tryAcquireShared(arg) < 0)
  7. doAcquireShared(arg);
  8. }
  9. //读写锁中的 tryAcquireShared
  10. protected final int tryAcquireShared(int unused) {
  11. Thread current = Thread.currentThread();
  12. int c = getState();
  13. //exclusiveCount:state&0x0000FFFF 会将高16位全部置0,计算结果就是获取的写锁的数量
  14. //
  15. if (exclusiveCount(c) != 0 &&
  16. getExclusiveOwnerThread() != current)
  17. return -1;
  18. //将 state 右移16位
  19. //获取state的高16位,然后CAS进行递增state,高16位味获取读锁的次数
  20. int r = sharedCount(c);
  21. /**
  22. * 在下面的代码中进行了三个判断:
  23. * 1、读锁是否应该排队。如果没有人排队,就进行if后面的判断。有人排队,就不会进行if后面的判断,而是最终调用fullTryAcquireShared()方法
  24. * 2、读锁数量是否超过最大值。(最大数量为2的16次方-1)
  25. * 3、尝试修改同步变量的值
  26. */
  27. if (!readerShouldBlock() &&
  28. r < MAX_COUNT &&
  29. //c + SHARED_UNIT;将高16位得到的结果 + 1
  30. compareAndSetState(c, c + SHARED_UNIT)) {
  31. //说明第一次获取读锁
  32. if (r == 0) {
  33. firstReader = current;
  34. firstReaderHoldCount = 1;
  35. } else if (firstReader == current) {
  36. //读锁数量不为0且firstReader(第一次获取读的线程)为当前线程,就将firstReaderHoldCount累加
  37. firstReaderHoldCount++;
  38. } else {
  39. // 读锁数量不为0,且第一个获取到读锁的线程不是当前线程
  40. // 下面这一段逻辑就是保存当前线程获取读锁的次数,如何保存的呢?
  41. // 通过ThreadLocal来实现的,readHolds就是一个ThreadLocal的实例
  42. HoldCounter rh = cachedHoldCounter;
  43. if (rh == null || rh.tid != getThreadId(current))
  44. cachedHoldCounter = rh = readHolds.get();
  45. else if (rh.count == 0)
  46. readHolds.set(rh);
  47. rh.count++;
  48. }
  49. return 1;
  50. }
  51. //最后去尝试获取锁
  52. return fullTryAcquireShared(current);
  53. }

解读:

  1. tryAcquireShared()方法中,会先通过exclusiveCount()方法来计算写锁的数量,如果写锁存在,再判断持有写锁的线程是不是当前线程,如果不是当前线程,就表示写锁被其他线程给占用,此时当前线程不能获取读锁。tryAcquireShared()方法返回-1,表示获取读锁失败。如果写锁不存在或者持有写锁的线程是当前线程,那么就表示当前线程有机会获取到读锁。
  2. 接下里会判断当前线程获取读锁是否不需要排队,读锁数量是否会超过最大值,以及通过CAS修改读锁的状态是否成功(将state的值加 1<<16)。如果这三个条件成立,就进入if语句块中,这一块的代码比较繁琐,但是功能比较单一,就是统计读锁的数量以及当前线程对读锁的重入次数,底层原理就是ThreadLocal。因为在读写锁中提供了getReadLockCount()、getReadHoldCount()等方法,这几个方法的数据就来自这儿。
  3. 如果上面的三个条件有一个不成立,就不会进入if语句块,那么就会调用fullTryAcquireShared()方法。该方法的作用就是让线程不停的获取锁,其源码如下。
  1. final int fullTryAcquireShared(Thread current) {
  2. HoldCounter rh = null;
  3. for (;;) {
  4. int c = getState();
  5. // 锁的状态为写锁时,持有锁的线程不等于当期那线程,就说明当前线程获取锁失败,返回-1
  6. if (exclusiveCount(c) != 0) {
  7. if (getExclusiveOwnerThread() != current)
  8. return -1;
  9. } else if (readerShouldBlock()) {
  10. // Make sure we're not acquiring read lock reentrantly
  11. if (firstReader == current) {
  12. // assert firstReaderHoldCount > 0;
  13. } else {
  14. if (rh == null) {
  15. rh = cachedHoldCounter;
  16. if (rh == null || rh.tid != getThreadId(current)) {
  17. rh = readHolds.get();
  18. if (rh.count == 0)
  19. readHolds.remove();
  20. }
  21. }
  22. if (rh.count == 0)
  23. return -1;
  24. }
  25. }
  26. if (sharedCount(c) == MAX_COUNT)
  27. throw new Error("Maximum lock count exceeded");
  28. // 尝试设置同步变量的值,只要设置成功了,就表示当前线程获取到了锁,然后就设置锁的获取次数等相关信息
  29. if (compareAndSetState(c, c + SHARED_UNIT)) {
  30. if (sharedCount(c) == 0) {
  31. firstReader = current;
  32. firstReaderHoldCount = 1;
  33. } else if (firstReader == current) {
  34. firstReaderHoldCount++;
  35. } else {
  36. if (rh == null)
  37. rh = cachedHoldCounter;
  38. if (rh == null || rh.tid != getThreadId(current))
  39. rh = readHolds.get();
  40. else if (rh.count == 0)
  41. readHolds.set(rh);
  42. rh.count++;
  43. cachedHoldCounter = rh; // cache for release
  44. }
  45. return 1;
  46. }
  47. }
  48. }
  1. 当获取到读锁成功以后,tryAcquireShared()方法会返回1,这样当回到AQSacquireShared()方法时,就会直接结束了。如果获取锁失败,tryAcquireShared()方法会返回-1,那么在AQS中,就会接着执行doAcquireShared()方法。doAcquireShared()方法的作用就是将自己加入到同步队列中,等待获取锁,直到获取锁成功。该方法不响应中断。

释放:
**
2.当一个线程调用 releaseShared(int arg)时会尝试使用 tryReleaseShared 操作释放资源,这里是设置状态变量 state的值,然后使用LockSupport.unpark(thread)激活AQS队列中被阻塞一个线程,被激活的线程则使用tryReleaseShared 查看当前状态变量 state 的值是否能满足自己的需要,满足则该线程被激活,然后继续向下允许,否则还是会被放入到AQS队列中配挂起

  1. public void unlock() {
  2. sync.releaseShared(1);
  3. }
  4. public final boolean releaseShared(int arg) {
  5. if (tryReleaseShared(arg)) {
  6. doReleaseShared();
  7. return true;
  8. }
  9. return false;
  10. }
  11. protected final boolean tryReleaseShared(int unused) {
  12. Thread current = Thread.currentThread();
  13. if (firstReader == current) {
  14. // assert firstReaderHoldCount > 0;
  15. if (firstReaderHoldCount == 1)
  16. firstReader = null;
  17. else
  18. firstReaderHoldCount--;
  19. } else {
  20. HoldCounter rh = cachedHoldCounter;
  21. if (rh == null || rh.tid != getThreadId(current))
  22. rh = readHolds.get();
  23. int count = rh.count;
  24. if (count <= 1) {
  25. readHolds.remove();
  26. if (count <= 0)
  27. throw unmatchedUnlockException();
  28. }
  29. --rh.count;
  30. }
  31. for (;;) {
  32. int c = getState();
  33. int nextc = c - SHARED_UNIT;
  34. if (compareAndSetState(c, nextc))
  35. // Releasing the read lock has no effect on readers,
  36. // but it may allow waiting writers to proceed if
  37. // both read and write locks are now free.
  38. return nextc == 0;
  39. }
  40. }
  1. tryReleaseShared()方法中,会先修改和读锁计数有关的数据,然后在for的死循环中,通过CAS操作将state的值减去1<<16。如果CAS操作成功,才会从for循环中退出。当读锁数量为0时,tryReleaseShared()返回true,表示锁被完全释放。
  2. tryReleaseShared()方法返回后,接下来的步骤和共享锁的释放逻辑完全一样的。

同样需要注意的是,AQS类中并没有提供可用的 tryAcquireShared 和 tryReleaseShared 方法,正如 AQS 是锁阻塞赛和同步器的基础框架一样,tryAcquireShared 和 tryRelaseShared 需要由具体的子来实现。子类在实现 tryAcquireShared 和 tryRelaseShared 时要根据具体的场景使用 CAS 算法尝试修改 state的状态值,成功则返回 true,否则则返回 false。
比如继承自AQS实现的读写锁 ReentrantReadWriteLock 里面的读锁在重写tryAcquireShared 时,首先去查看写锁是否被其他线程持有,如果时则直接返回 false,否则使用CAS递增 state的高16位(在ReentrantReadWriteLock 中,state的高16位为获取读锁的次数)
比如继承自AQS实现的读写锁 ReentrantReadWriteLock 里面的读锁在重写 tryReleaseShared 时,在内部需要使用 CAS 算法把当前的 state的值的高16位 - 1然后返回true,如果CAS 失败则直接返回 false