线程协作与流程控制

1. 并发流程控制

1.1 什么是并发流程控制

  • 并发流程控制,就是让线程之间相互配合完成任务,来满足业务逻辑
  • 如:让线程A等待线程B完成后再执行等策略

    1.2 并发流程控制的工具

    | 类 | 作用 | 说明 | | —- | —- | —- | | Semaphore | 信号量:可以通过控制“许可”的数量,来保证线程间配合 | 线程只有拿到了许可才可以继续运行 | | CyclicBarrier | 循环栅栏:线程会等待,直到足够多线程达到了规定数量,再执行下一步任务 | 适用于线程间相互等待处理结果就绪的场景 | | Phaser | 和CyclicBarrier类似,但是计数可变 | java7加入的新类 | | CountDownLatch | 也是一个计数等待相关,数量地见到0时,触发动作 | 不可重复使用 | | Exchanger | 让两个线程在合适时交换对象 | 适用于两个线程工作在同一个类的不同实例上时,用于交换数据 | | Condition | 可以控制线程的等待和唤醒 | 是Object.wati()的升级版 |

2. CountDownLatch计数门闩

2.1 作用

  • 并发流程控制的工具,用于等待数量(我们设定的)足够后再执行某些任务

2.2 主要方法

  • CountDownLatch(int count):只有一个构造方法,参数count为需要倒数的值
  • await():调用此方法的线程会被挂起,它会等到count值为零的时候才继续执行
  • countdown():讲count减1,直到0,等待的线程会被唤醒

2.3 用法一:等待线程执行完毕

  1. /**
  2. * @author yiren
  3. */
  4. public class CountDownLatchExample01 {
  5. public static void main(String[] args) throws InterruptedException {
  6. AtomicInteger integer = new AtomicInteger(1);
  7. CountDownLatch latch = new CountDownLatch(5);
  8. ExecutorService executorService = Executors.newFixedThreadPool(5);
  9. for (int i = 0; i < 5; i++) {
  10. executorService.execute(() -> {
  11. try {
  12. System.out.println(Thread.currentThread().getName()+ " produce ....");
  13. TimeUnit.SECONDS.sleep(1);
  14. integer.incrementAndGet();
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }finally {
  18. latch.countDown();
  19. }
  20. });
  21. }
  22. System.out.println(Thread.currentThread().getName() + " waiting....");
  23. latch.await();
  24. System.out.println(Thread.currentThread().getName() + " finished!");
  25. System.out.println(Thread.currentThread().getName() + " num: " + integer.get());
  26. executorService.shutdown();
  27. }
  28. }
  1. pool-1-thread-1 produce ....
  2. pool-1-thread-2 produce ....
  3. pool-1-thread-3 produce ....
  4. main waiting....
  5. pool-1-thread-4 produce ....
  6. pool-1-thread-5 produce ....
  7. main finished!
  8. main num: 6
  9. Process finished with exit code 0

2.4 用法二:多等一

  1. /**
  2. * @author yiren
  3. */
  4. public class CountDownLatchExample02 {
  5. public static void main(String[] args) throws InterruptedException {
  6. CountDownLatch latch = new CountDownLatch(1);
  7. ExecutorService executorService = Executors.newFixedThreadPool(5);
  8. for (int i = 0; i < 5; i++) {
  9. executorService.execute(() -> {
  10. System.out.println(Thread.currentThread().getName() + " ready!");
  11. try {
  12. latch.await();
  13. System.out.println(Thread.currentThread().getName()+ " produce ....");
  14. TimeUnit.SECONDS.sleep(1);
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }finally {
  18. latch.countDown();
  19. }
  20. });
  21. }
  22. Thread.sleep(10);
  23. System.out.println(Thread.currentThread().getName() + " ready!");
  24. latch.countDown();
  25. System.out.println(Thread.currentThread().getName() + " go!");
  26. executorService.shutdown();
  27. }
  28. }
  1. pool-1-thread-1 ready!
  2. pool-1-thread-4 ready!
  3. pool-1-thread-3 ready!
  4. pool-1-thread-2 ready!
  5. pool-1-thread-5 ready!
  6. main ready!
  7. main go!
  8. pool-1-thread-1 produce ....
  9. pool-1-thread-2 produce ....
  10. pool-1-thread-5 produce ....
  11. pool-1-thread-3 produce ....
  12. pool-1-thread-4 produce ....
  13. Process finished with exit code 0

2.4 注意

  • CountDownLatch不仅可以无限等待,还可以给参数,在指定的事件内如果等到就唤醒线程继续执行

    1. boolean await(long timeout, TimeUnit unit)
  • CountDownLatch不能重用,如果涉及重新计数,可以使用CyclicBarrier或者新创建CountDownLatch

3. Semaphore信号量

3.1 信号量作用

  • Semaphore可以用来限制或管理数量有限的资源使用情况
  • 信号量的租用是维护一个许可计数,线程可以获取许可,然后信号量减一;线程也可以释放许可,信号量就加一;如果信号量的许可颁发完了,其他线程想要获取,就需要等待,直到有另外的线程释放了许可。

3.2 信号量使用

  1. 初始化Semaphore指定许可数量
  2. 在需要获取许可的代码前面加上acquire()或者acquireUniterruptibly()方法
  3. 任务执行完成有调用release()释放许可

3.3 主要方法

  • Semaphore(int permits, boolean fair)这里设置许可数量,以及是否使用公平策略。
    • 如果传入true那么久吧等待线程放入到FIFO的队列里面。
  • aquire()请求许可,可以响应中断
  • aquireUnniterruptibly()请求许可不可中断
  • tryAcquire()看看现在有没有空闲的许可,如果有那就返回true;这个方法还可以设置等待时间给一个timeout,让线程等待一段时间。
  • release()释放许可

3.4 案例演示

  1. /**
  2. * @author yiren
  3. */
  4. public class SemaphoreExample01 {
  5. public static void main(String[] args) {
  6. Semaphore semaphore = new Semaphore(3, true);
  7. ExecutorService executorService = Executors.newFixedThreadPool(10);
  8. for (int i = 0; i < 8; i++) {
  9. executorService.execute(() -> {
  10. try {
  11. System.out.println(Thread.currentThread().getName()+" start to get permit");
  12. semaphore.acquire();
  13. Thread.sleep(2000);
  14. System.out.println(Thread.currentThread().getName() + " " + LocalDateTime.now() +" finished!");
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }finally {
  18. semaphore.release();
  19. }
  20. });
  21. }
  22. executorService.shutdown();
  23. }
  24. }
  1. pool-1-thread-1 start to get permit
  2. pool-1-thread-4 start to get permit
  3. pool-1-thread-3 start to get permit
  4. pool-1-thread-2 start to get permit
  5. pool-1-thread-5 start to get permit
  6. pool-1-thread-6 start to get permit
  7. pool-1-thread-7 start to get permit
  8. pool-1-thread-8 start to get permit
  9. pool-1-thread-3 2020-02-21T19:54:47.392 finished!
  10. pool-1-thread-1 2020-02-21T19:54:47.392 finished!
  11. pool-1-thread-4 2020-02-21T19:54:47.392 finished!
  12. pool-1-thread-6 2020-02-21T19:54:49.396 finished!
  13. pool-1-thread-2 2020-02-21T19:54:49.396 finished!
  14. pool-1-thread-5 2020-02-21T19:54:49.396 finished!
  15. pool-1-thread-8 2020-02-21T19:54:51.401 finished!
  16. pool-1-thread-7 2020-02-21T19:54:51.401 finished!
  17. Process finished with exit code 0

3.5 注意点

  • 获取和释放的许可证必须一致,acquire和release都是可以传入数值的来确定获取和释放的数量。如果我们获取和释放不一致,就会容易导致程序bug。当然也不是绝对,除非有特殊业务需求,否则都获取释放设置为一样的
  • 注意在初始化Semaphore的时候设置公平性,一般设置为true会比较合理。如果插队情况比较严重的话,某些线程可能一直阻塞
  • 获取和释放许可对线程并不要求,线程A获取了可以线程B释放。

4. Condition接口

4.1 作用

  • 当线程A需要等待某个任务或者某个资源,就可以执行condition.await()方法,然后就会陷入阻塞状态。
  • 此时另一个线程B,去获取资源或者执行任务完成后,调用condition.signal()或者signalAll()方法,通知线程A,继续执行
  • 这个类似于object.wait()notify()notifyAll()
  • signal()方法如果遇到多个线程都在等待的时候,会去唤醒等待时间最长的那个
  • 在我们ReentrantLock中就可以直接新建Condition。看下面案例

4.2 案例演示

  • 普通用法
  1. /**
  2. * @author yiren
  3. */
  4. public class ConditionExample01 {
  5. private static ReentrantLock lock = new ReentrantLock();
  6. private static Condition condition = lock.newCondition();
  7. public static void main(String[] args) throws InterruptedException {
  8. Thread thread1 = new Thread(() -> {
  9. task1();
  10. });
  11. Thread thread2 = new Thread(() -> {
  12. task2();
  13. });
  14. thread1.start();
  15. Thread.sleep(100);
  16. thread2.start();
  17. }
  18. private static void task1() {
  19. lock.lock();
  20. try {
  21. System.out.println(Thread.currentThread().getName() + " start await()");
  22. condition.await();
  23. System.out.println(Thread.currentThread().getName() + " await finished!");
  24. Thread.sleep(10);
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. } finally {
  28. lock.unlock();
  29. }
  30. }
  31. private static void task2() {
  32. lock.lock();
  33. try {
  34. System.out.println(Thread.currentThread().getName() + " start signal()");
  35. Thread.sleep(1000);
  36. condition.signal();
  37. System.out.println(Thread.currentThread().getName() + " signal finished!");
  38. } catch (InterruptedException e) {
  39. e.printStackTrace();
  40. } finally {
  41. lock.unlock();
  42. }
  43. }
  44. }
  1. Thread-0 start await()
  2. Thread-1 start signal()
  3. Thread-1 signal finished!
  4. Thread-0 await finished!
  5. Process finished with exit code 0
  • 生产者消费者模式
  1. /**
  2. * @author yiren
  3. */
  4. public class ConditionExample02 {
  5. private int queueSize = 10;
  6. private PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);
  7. private Lock lock = new ReentrantLock();
  8. private Condition notFull = lock.newCondition();
  9. private Condition notEmpty = lock.newCondition();
  10. public static void main(String[] args) {
  11. ConditionExample02 conditionDemo2 = new ConditionExample02();
  12. Producer producer = conditionDemo2.new Producer();
  13. Consumer consumer = conditionDemo2.new Consumer();
  14. producer.start();
  15. consumer.start();
  16. }
  17. class Consumer extends Thread {
  18. @Override
  19. public void run() {
  20. consume();
  21. }
  22. private void consume() {
  23. while (true) {
  24. lock.lock();
  25. try {
  26. while (queue.size() == 0) {
  27. System.out.println("队列空,等待数据");
  28. try {
  29. notEmpty.await();
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. queue.poll();
  35. notFull.signalAll();
  36. System.out.println("从队列里取走了一个数据,队列剩余" + queue.size() + "个元素");
  37. } finally {
  38. lock.unlock();
  39. }
  40. }
  41. }
  42. }
  43. class Producer extends Thread {
  44. @Override
  45. public void run() {
  46. produce();
  47. }
  48. private void produce() {
  49. while (true) {
  50. lock.lock();
  51. try {
  52. while (queue.size() == queueSize) {
  53. System.out.println("队列满,等待有空余");
  54. try {
  55. notFull.await();
  56. } catch (InterruptedException e) {
  57. e.printStackTrace();
  58. }
  59. }
  60. queue.offer(1);
  61. notEmpty.signalAll();
  62. System.out.println("向队列插入了一个元素,队列剩余空间" + (queueSize - queue.size()));
  63. } finally {
  64. lock.unlock();
  65. }
  66. }
  67. }
  68. }
  69. }
  • 以上使用两个Condition作为队列满和空的通知传递工具在生产者和消费者之间互通

4.3 注意点

  • 我们知道Lock可以看做synchronized的替代方案,而Condition就是用来替代object.wait/notify的,在用法上几乎一致。
  • 调用await()方法时必须持有Lock锁,否则会抛出异常,并且await()方法会释放当前持有的Lock锁,
  • 一个Lock锁可以有多个Condition更加灵活

5. CyclicBarrier循环栅栏

5.1 作用

  • CyclicBarrier循环栅栏和CountDownLatch很类似,都能阻塞一组线程
  • 当需要多个线程配合完成任务,并最后需要统一汇总时,我们就可以使用CyclicBarrier,当某个线程完成任务后,它先会等待,等到所有线程都执行好了任务,再一起继续执行剩下的任务
    • 比如:同时出去聚餐约在了公司,等大家到公司了一起走过去。
  • 但是注意CyclicBarrier是可以重复使用的,这个和CountDownLatch不同

5.2 案例

  1. /**
  2. * @author yiren
  3. */
  4. public class CyclicBarrierExample {
  5. public static void main(String[] args) {
  6. CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
  7. @Override
  8. public void run() {
  9. System.out.println("所有人都到场了, 大家统一出发!");
  10. }
  11. });
  12. for (int i = 0; i < 10; i++) {
  13. new Thread(new Task(i, cyclicBarrier)).start();
  14. }
  15. }
  16. static class Task implements Runnable {
  17. private int id;
  18. private CyclicBarrier cyclicBarrier;
  19. public Task(int id, CyclicBarrier cyclicBarrier) {
  20. this.id = id;
  21. this.cyclicBarrier = cyclicBarrier;
  22. }
  23. @Override
  24. public void run() {
  25. System.out.println("线程" + id + "现在前往集合地点");
  26. try {
  27. Thread.sleep((long) (Math.random() * 10000));
  28. System.out.println("线程" + id + "到了集合地点,开始等待其他人到达");
  29. cyclicBarrier.await();
  30. System.out.println("线程" + id + "出发了");
  31. } catch (InterruptedException | BrokenBarrierException e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. }
  36. }
  1. 线程0现在前往集合地点
  2. 线程2现在前往集合地点
  3. 线程3现在前往集合地点
  4. 线程1现在前往集合地点
  5. 线程4现在前往集合地点
  6. 线程5现在前往集合地点
  7. 线程6现在前往集合地点
  8. 线程7现在前往集合地点
  9. 线程8现在前往集合地点
  10. 线程9现在前往集合地点
  11. 线程3到了集合地点,开始等待其他人到达
  12. 线程9到了集合地点,开始等待其他人到达
  13. 线程8到了集合地点,开始等待其他人到达
  14. 线程4到了集合地点,开始等待其他人到达
  15. 线程5到了集合地点,开始等待其他人到达
  16. 所有人都到场了, 大家统一出发!
  17. 线程5出发了
  18. 线程3出发了
  19. 线程8出发了
  20. 线程4出发了
  21. 线程9出发了
  22. 线程1到了集合地点,开始等待其他人到达
  23. 线程6到了集合地点,开始等待其他人到达
  24. 线程0到了集合地点,开始等待其他人到达
  25. 线程7到了集合地点,开始等待其他人到达
  26. 线程2到了集合地点,开始等待其他人到达
  27. 所有人都到场了, 大家统一出发!
  28. 线程2出发了
  29. 线程1出发了
  30. 线程7出发了
  31. 线程0出发了
  32. 线程6出发了
  33. Process finished with exit code 0
  • 每五个人到了过后,就出发一批

5.3 CountDownLatchCyclicBarrier`区别

  • 作用不同:CountDownLatch使用countDown()是用于事件的,而CyclicBarrier使用await()是用于线程的
  • 可重用性不同:CountDownLatch在倒数到0后不能再次重用,除非创建新对象;而CyclicBarrier是可以直接重用的

6. 深入AQS理解J.U.C的根基

6.1 AQS作用及其重要性

  • AQS在CountDownLatch等工具内都有使用,全称是:AbstractQueuedSynchronizer是一个抽象类
  • 锁和上面的线程并发控制类(Semaphore等)都有类似的地方。 其实他们底层都是使用了AQS作为基类的拓展
  • 正因为他们很多工作都类似,JDK就把这部分通用逻辑抽离了出来,提供给他们直接使用,使其不必关注很多深层次的细节,从而完成他们的功能。
  • 我们可以大致看一下我们锁用到的这些并发控制的工具类和锁的内部实现

    • `Semaphore``

      1. public class Semaphore implements java.io.Serializable {
      2. private static final long serialVersionUID = -3222578661600680210L;
      3. private final Sync sync;
      4. abstract static class Sync extends AbstractQueuedSynchronizer {
      5. private static final long serialVersionUID = 1192457210091910933L;
      6. Sync(int permits) {
      7. setState(permits);
      8. }
      9. ......
    • ReentrantLock

      1. public class ReentrantLock implements Lock, java.io.Serializable {
      2. private static final long serialVersionUID = 7373984872572414699L;
      3. private final Sync sync;
      4. abstract static class Sync extends AbstractQueuedSynchronizer {
      5. private static final long serialVersionUID = -5179523762034025860L;
      6. ......
    • CountDownLatch

      1. public class CountDownLatch {
      2. private static final class Sync extends AbstractQueuedSynchronizer {
      3. private static final long serialVersionUID = 4982264981922014374L;
      4. Sync(int count) {
      5. setState(count);
      6. }
      7. ......
  • 由上源码我们可以看到,里面都有一个内部类,Sync继承自AbstractQueuedSynchronizer

  • 那么AQS是用来干些什么事情的呢?
    • J.U.C基本都是是基于AQS实现的,AQS是一个用于构建锁、同步器、线程协作工具类的框架供给子类使用,主要使用模板模式来设计。
    • 它主要工作就是管理线程的阻塞与唤醒,实现同步的管理,以及阻塞线程的队列管理工作

6.2 AQS的组成及内部原理

  • AbstractQueuedSynchronizer自JDK1.5加入,是基于FIFO等待队列实现的一个用于同步器的基础框架。
  • JDK1.8 继承AQS实现的类:

  • 我们可以看到,在可重入锁,读写锁,计数门闩等,信号量里面都是用了AQS的子类,接下来我们就学习一下AQS的内部原理

  1. AQS的三大部分
    • state:状态,
    • FIFO队列:线程竞争锁的管理队列
    • 获取和释放方法:需要工具类去实现的方法
  2. state:状态

    1. /**
    2. * The synchronization state.
    3. */
    4. private volatile int state;
    • 它的含义并不具体,根据实现的不同而不同,如:Semaphore内是剩余许可数量、CountDownLatch内是还需要倒数的数量,可看做一个计数器,只是不同类的作用及意义不用

      1. protected final boolean compareAndSetState(int expect, int update) {
      2. // See below for intrinsics setup to support this
      3. return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
      4. }
    • 状态值的更新,是使用Unsafe的CAS完成

    • 在ReentrantLock中:state表示锁的占用情况,可重入的计数,每重入一次就加一,当要释放锁时,它的值就会变成0,表示不被任何线程占有。
  3. FIFO队列:

    1. /**
    2. * Head of the wait queue, lazily initialized. Except for
    3. * initialization, it is modified only via method setHead. Note:
    4. * If head exists, its waitStatus is guaranteed not to be
    5. * CANCELLED.
    6. */
    7. private transient volatile Node head;
    8. /**
    9. * Tail of the wait queue, lazily initialized. Modified only via
    10. * method enq to add new wait node.
    11. */
    12. private transient volatile Node tail;
    • 这个队列是用来存放等待的线程的,AQS会对这个队列进行管理。当多个线程竞争锁时,没有拿到锁的,就会被翻到队列中,当前拿到锁的执行任务的线程结束,AQS就会从队列中选一个线程来占有这个锁。
    • AQS维护一个双向链表的等待队列,把等待线程都放到这个队列里面管理;队列头节点是当前拿到锁的线程;在AQS中保存了这个队列的头尾节点。
  4. 获取和释放的方法
    • 获取方法:
      • 获取操作会依赖state变量,经常会阻塞,如:获取不到锁的时候,获取不到许可的时候等
      • ReentrantLock中,就是获取锁。state+1
      • Semaphore中就是acquire获取许可,state-1,当state==0就会阻塞
      • CountDownLatch中就是await方法,就是等待state==0
    • 释放方法:
      • 释放操作不会阻塞
      • ReentrantLock中就是unlock方法调用release(1)对应state-1
      • Semaphore中就是realease,也是state-1
      • CountDownLatch中就是countDown方法,也是state-1
    • 一般情况下,实现类都会实现tryAcquiretryRelease相关方法,以对应各个类的需求

6.3 AQS的用法

  1. 指定协作逻辑,实现获取和释放方法
  2. 在内部写一个Sync类继承AQS
  3. 根据是否独占来决定重写的方法:独占使用tryAcquire/tryRelease、共享使用tryAcquireShared(int acquires)/tryReleaseShared(int releases),在主逻辑里面的获取释放相关方法中调用Sync的方法

7. AQS在CountDownLatch中的源码剖析

  • 下面我们以CountDownLatch为例分析源码:
  • 构造函数

    • 我们看到内部实现就是初始化一个Sync然后把计数值传入

      1. public CountDownLatch(int count) {
      2. if (count < 0) throw new IllegalArgumentException("count < 0");
      3. this.sync = new Sync(count);
      4. }
    • 我们可以看下面的CountDownlatchSync的实现,在构造方法创建的Sync传入的count调用了setState方法传入了AQSstate

  • CountDownLatch内部有一个继承AQS的Sync

    1. private static final class Sync extends AbstractQueuedSynchronizer {
    2. private static final long serialVersionUID = 4982264981922014374L;
    3. Sync(int count) {
    4. setState(count);
    5. }
    6. int getCount() {
    7. return getState();
    8. }
    9. protected int tryAcquireShared(int acquires) {
    10. return (getState() == 0) ? 1 : -1;
    11. }
    12. protected boolean tryReleaseShared(int releases) {
    13. // Decrement count; signal when transition to zero
    14. for (;;) {
    15. int c = getState();
    16. if (c == 0)
    17. return false;
    18. int nextc = c-1;
    19. if (compareAndSetState(c, nextc))
    20. return nextc == 0;
    21. }
    22. }
    23. }
  • CountDownLatchgetCount()方法

    1. public long getCount() {
    2. return sync.getCount();
    3. }
    • 我们可以看到getCount实际也是调用SyncgetCount()来获取state并返回
  • CountDownLatchcountDown()方法

    1. public void countDown() {
    2. sync.releaseShared(1);
    3. }
    • 我们看一看到它直接调用了AQSreleaseShared(1)

      1. public final boolean releaseShared(int arg) {
      2. if (tryReleaseShared(arg)) {
      3. doReleaseShared();
      4. return true;
      5. }
      6. return false;
      7. }
    • releaseShared则是回去调用CountDownLatch中实现的tryReleaseShared

      1. protected boolean tryReleaseShared(int releases) {
      2. // Decrement count; signal when transition to zero
      3. for (;;) {
      4. int c = getState();
      5. if (c == 0)
      6. return false;
      7. int nextc = c-1;
      8. if (compareAndSetState(c, nextc))
      9. return nextc == 0;
      10. }
      11. }
    • 而在tryReleaseShared中则是主要对state的值做-1操作,如果state大于零可以获取到就减一并且用CAS并发更新值,如果最新值为0就返回true

    • 返回true过后就doReleaseShared释放锁,唤醒队列里面的等待线程。也就是调用了await()方法的线程
  • CountDownLatchawait()方法

    1. public void await() throws InterruptedException {
    2. sync.acquireSharedInterruptibly(1);
    3. }
    • await则会调用AQS中的默认实现sync.acquireSharedInterruptibly(1);

      1. public final void acquireSharedInterruptibly(int arg)
      2. throws InterruptedException {
      3. if (Thread.interrupted())
      4. throw new InterruptedException();
      5. if (tryAcquireShared(arg) < 0)
      6. doAcquireSharedInterruptibly(arg);
      7. }
    • 而里面则是调用tryAcquireShared(arg) < 0看是否小于0,如果小于0就代表没有获取到锁,就调用doAcquireSharedInterruptibly(arg);入队

    • tryAcquireShared则是在CountDownLatch中的Sync实现的

      1. protected int tryAcquireShared(int acquires) {
      2. return (getState() == 0) ? 1 : -1;
      3. }
    • 如果当前state为0了(也就是说计数已经到0了)就返回一个1就不会满住上面的acquireSharedInterruptibly方法中的条件,就会放行,如果不等于0就会返回-1,此时就会入队。调用doAcquireSharedInterruptibly方法

      1. private void doAcquireSharedInterruptibly(int arg)
      2. throws InterruptedException {
      3. final Node node = addWaiter(Node.SHARED);
      4. boolean failed = true;
      5. try {
      6. for (;;) {
      7. final Node p = node.predecessor();
      8. if (p == head) {
      9. int r = tryAcquireShared(arg);
      10. if (r >= 0) {
      11. setHeadAndPropagate(node, r);
      12. p.next = null; // help GC
      13. failed = false;
      14. return;
      15. }
      16. }
      17. if (shouldParkAfterFailedAcquire(p, node) &&
      18. parkAndCheckInterrupt())
      19. throw new InterruptedException();
      20. }
      21. } finally {
      22. if (failed)
      23. cancelAcquire(node);
      24. }
      25. }
    • 这个方法首先会把当前线程在addWaiter中包装成一个Node节点并添加到队列尾部;而这个Node节点就是FIFO队列的节点。

    • 然后就会进入循环,如果当前节点不是head,那么就会进入到后面的判断,其中重要的是parkAndCheckInterrupt,方法如下:

      1. private final boolean parkAndCheckInterrupt() {
      2. LockSupport.park(this);
      3. return Thread.interrupted();
      4. }
    • 它会调用LockSupportpark并且此park方法就是封装了Unsafe的native方法park()来把线程挂起进入阻塞状态

      1. public static void park(Object blocker) {
      2. Thread t = Thread.currentThread();
      3. setBlocker(t, blocker);
      4. UNSAFE.park(false, 0L);
      5. setBlocker(t, null);
      6. }
    • 再往下就没意义了。我们只需要知道doAcquireSharedInterruptibly方法就是把当前线程放到阻塞队列中,并且把线程阻塞就OK了。

  • AQS在CountDownLatch中使用的一些点:
    • 调用CountDownLatchawait()时,便会尝试获取共享锁,开始时是获取不到锁的,于是就被阻塞
    • 可以获取到的条件就是计数器为0,也就是state==0的时候。
    • 只有每次调用countDown方法才会使得计数器减一,减到0时就回去唤醒阻塞中的线程。

8. AQS在Semaphore中的源码剖析

  • 由于上面讲得很细了,接下来就简略一些
  • Semaphorestate就是许可证的数量
  • 主要的操作就是acquire和release,也是借用Sync对state的操作来控制线程的阻塞与唤醒
  1. public void acquire() throws InterruptedException {
  2. sync.acquireSharedInterruptibly(1);
  3. }
  1. public void release() {
  2. sync.releaseShared(1);
  3. }
  • 先看下acquire调用的acquireSharedInterruptibly此方法在上面已经说过。
  1. public final void acquireSharedInterruptibly(int arg)
  2. throws InterruptedException {
  3. if (Thread.interrupted())
  4. throw new InterruptedException();
  5. if (tryAcquireShared(arg) < 0)
  6. doAcquireSharedInterruptibly(arg);
  7. }
  • 而在Semaphore中Sync有两个实现:NonfairSyncFairSync
  • 在FairSync中tryAcquireShared就会有hasQueuedPredecessors判断,如果不是头节点,那就返回-1,在acquireSharedInterruptibly方法中去调用doAcquireSharedInterruptibly入队并且阻塞线程
  1. protected int tryAcquireShared(int acquires) {
  2. for (;;) {
  3. if (hasQueuedPredecessors())
  4. return -1;
  5. int available = getState();
  6. int remaining = available - acquires;
  7. if (remaining < 0 ||
  8. compareAndSetState(available, remaining))
  9. return remaining;
  10. }
  11. }
  • 而在NonfairSync中而是直接调用SyncnonfairTryAcquireShared

    1. protected int tryAcquireShared(int acquires) {
    2. return nonfairTryAcquireShared(acquires);
    3. }
    1. final int nonfairTryAcquireShared(int acquires) {
    2. for (;;) {
    3. int available = getState();
    4. int remaining = available - acquires;
    5. if (remaining < 0 ||
    6. compareAndSetState(available, remaining))
    7. return remaining;
    8. }
    9. }
    • 可以看到其中并没有对是否阻塞队列的头节点判断,直接去获取值,判断是会否许可足够。
  • release中则是调用AQS的releaseShared其也是调用SemaphoreSynctryReleaseShared来判断是否需要释放锁,去唤醒阻塞线程

    1. public final boolean releaseShared(int arg) {
    2. if (tryReleaseShared(arg)) {
    3. doReleaseShared();
    4. return true;
    5. }
    6. return false;
    7. }
  • tryReleaseShared

    1. protected final boolean tryReleaseShared(int releases) {
    2. for (;;) {
    3. int current = getState();
    4. int next = current + releases;
    5. if (next < current) // overflow
    6. throw new Error("Maximum permit count exceeded");
    7. if (compareAndSetState(current, next))
    8. return true;
    9. }
    10. }
  • 我们可以看到此处就是关于Semaphore的已获取许可的释放 把state加回去然后用CAS更新state

9. AQS在ReentrantLock中的应用

  • ReentrantLock中,state主要是重入的次数,加锁的时候state+1 ,而在释放锁的时候,state-1然后判断当前的state==0
  • ReentrantLock中与AQS相关的有三个类:UnfairSyncFairSyncSync
  • 关于加锁和解锁的逻辑也是AQS中的acquire方法的逻辑(获取锁失败就会放入队列中)和release方法(调用子类的tryRelease来去掉头部,并且唤醒线程)
  • 而加锁解锁中的逻辑,主要是公平锁和非公平锁的区别,公平锁会去判断是否在队列头部,如果在才会去执行,而非公平锁则会抢锁。不会管你是不是在队列头部。
  • 相信在上面的源码分析过后,分析ReentrantLock是十分简单的。大家可以自行分析。