一、应用

场景一:多线程等待单线程资源

  1. static ThreadPoolExecutor executor = new ThreadPoolExecutor(
  2. 3,
  3. 5,
  4. 100,
  5. TimeUnit.SECONDS,
  6. new ArrayBlockingQueue<>(100),
  7. (ThreadFactory) Thread::new);
  8. public static void main(String[] args) throws InterruptedException {
  9. multipleAwait();
  10. //singleAwait();
  11. }
  12. //多线程等待单个线程资源
  13. private static void multipleAwait() throws InterruptedException {
  14. CountDownLatch countDownLatch = new CountDownLatch(1);
  15. for (int i = 0; i < 3; i++) {
  16. executor.execute(() -> {
  17. try {
  18. System.out.println(Thread.currentThread().getName() + "等待主线程资源");
  19. countDownLatch.await();
  20. TimeUnit.SECONDS.sleep(1);
  21. System.out.println(Thread.currentThread().getName() + "执行完成");
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. });
  26. }
  27. executor.shutdown();
  28. TimeUnit.SECONDS.sleep(3);
  29. countDownLatch.countDown();
  30. System.out.println("主线程执行完成");
  31. }

场景二:单线程等待多线程执行完成

  1. //多个线程分别执行任务,全部执行完成,进行汇总。
  2. private void singleAwait() throws InterruptedException {
  3. ThreadPoolExecutor executor = new ThreadPoolExecutor(
  4. 3,
  5. 5,
  6. 100,
  7. TimeUnit.SECONDS,
  8. new ArrayBlockingQueue<>(100),
  9. (ThreadFactory) Thread::new);
  10. CountDownLatch countDownLatch = new CountDownLatch(4);
  11. //多线程等待
  12. for (int i = 0; i <= 3; i++) {
  13. executor.execute(() -> {
  14. try {
  15. TimeUnit.SECONDS.sleep(1);
  16. System.out.println(Thread.currentThread().getName() + "执行完成");
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. } finally {
  20. countDownLatch.countDown();
  21. }
  22. });
  23. }
  24. executor.shutdown();
  25. System.out.println("等待其他任务任务执行");
  26. countDownLatch.await();
  27. System.out.println("任务执行完成");
  28. }
  29. 执行结果:
  30. 等待其他任务任务执行
  31. Thread-1执行完成
  32. Thread-2执行完成
  33. Thread-0执行完成
  34. Thread-1执行完成
  35. 任务执行完成

二、原理分析

底层仍然依赖AQS

await方法

尝试获取共享锁

  1. /**
  2. * 模拟。调用获取共享锁方法。
  3. * 如果state不为0.则认为未获取到共享锁。则当前线程进入阻塞队列中。
  4. **/
  5. public void await() throws InterruptedException {
  6. sync.acquireSharedInterruptibly(1);
  7. }
  8. //底层调用AQS方法.中断则抛出异常
  9. public final void acquireSharedInterruptibly(int arg)
  10. throws InterruptedException {
  11. if (Thread.interrupted())
  12. throw new InterruptedException();
  13. if (tryAcquireShared(arg) < 0)
  14. doAcquireSharedInterruptibly(arg);
  15. }
  16. //countdownlatch实现。如果不等于0,则返回-1.标识获取共享锁失败
  17. protected int tryAcquireShared(int acquires) {
  18. return (getState() == 0) ? 1 : -1;
  19. }

doAcquireSharedInterruptibly方法。获取锁失败逻辑

  1. /**
  2. * 具体过程,如果state!=0,则线程阻塞;等待state=0的那个线程从阻塞队列中唤醒
  3. *
  4. * 获取锁失败处理逻辑。
  5. * 1.创建共享节点,插入同步等待队列中(CLH)
  6. * 2.如果,当前节点是头结点(或者节点重新被唤醒),则再次尝试获取锁。
  7. * 2.1.获取锁成功,则修改等待列表头结点
  8. * 2.2.并尝试唤醒队列中其他节点。(具体过程,如果state!=0,则线程阻塞;等待state=0的那个线程从阻塞队列中唤醒)
  9. * 3.不是头结点,
  10. * 3.1.第一次循环:修改waitstatus=-1
  11. * 3.2.第二次循环:阻塞当前线程。
  12. **/
  13. private void doAcquireSharedInterruptibly(int arg)
  14. throws InterruptedException {
  15. final Node node = addWaiter(Node.SHARED);
  16. boolean failed = true;
  17. try {
  18. for (;;) {
  19. final Node p = node.predecessor();
  20. if (p == head) {
  21. int r = tryAcquireShared(arg);
  22. if (r >= 0) {
  23. setHeadAndPropagate(node, r);
  24. p.next = null; // help GC
  25. failed = false;
  26. return;
  27. }
  28. }
  29. if (shouldParkAfterFailedAcquire(p, node) &&
  30. parkAndCheckInterrupt())
  31. throw new InterruptedException();
  32. }
  33. } finally {
  34. if (failed)
  35. cancelAcquire(node);
  36. }
  37. }

await(long timeout, TimeUnit unit)方法

�尝试获取共享锁。当前线程,阻塞timeout时间后,自动唤醒

  1. /**
  2. * 模拟。调用获取共享锁方法。
  3. * 如果state不为0.则认为未获取到共享锁。则【当前线程】进入阻塞队列中,并阻塞指定时间(timeout)。
  4. **/
  5. public boolean await(long timeout, TimeUnit unit)
  6. throws InterruptedException {
  7. return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  8. }
  9. // 如果获取锁成功(即state==0),直接返回
  10. // 不成功,则执行 doAcquireSharedNanos
  11. public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
  12. throws InterruptedException {
  13. if (Thread.interrupted())
  14. throw new InterruptedException();
  15. return tryAcquireShared(arg) >= 0 ||
  16. doAcquireSharedNanos(arg, nanosTimeout);
  17. }
  18. //countdownlatch实现。如果不等于0,则返回-1.标识获取共享锁失败
  19. protected int tryAcquireShared(int acquires) {
  20. return (getState() == 0) ? 1 : -1;
  21. }

doAcquireSharedNanos(arg, nanosTimeout)

  1. //使用LockSupport.parkNanos进行定时阻塞
  2. private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
  3. throws InterruptedException {
  4. if (nanosTimeout <= 0L)
  5. return false;
  6. final long deadline = System.nanoTime() + nanosTimeout;
  7. final Node node = addWaiter(Node.SHARED);
  8. boolean failed = true;
  9. try {
  10. for (;;) {
  11. final Node p = node.predecessor();
  12. if (p == head) {
  13. int r = tryAcquireShared(arg);
  14. if (r >= 0) {
  15. setHeadAndPropagate(node, r);
  16. p.next = null; // help GC
  17. failed = false;
  18. return true;
  19. }
  20. }
  21. nanosTimeout = deadline - System.nanoTime();
  22. if (nanosTimeout <= 0L)
  23. return false;
  24. if (shouldParkAfterFailedAcquire(p, node) &&
  25. nanosTimeout > spinForTimeoutThreshold)
  26. LockSupport.parkNanos(this, nanosTimeout);
  27. if (Thread.interrupted())
  28. throw new InterruptedException();
  29. }
  30. } finally {
  31. if (failed)
  32. cancelAcquire(node);
  33. }
  34. }

countDown方法

释放资源,与共享锁区别。state进行减1操作

  1. public void countDown() {
  2. sync.releaseShared(1);
  3. }
  4. public final boolean releaseShared(int arg) {
  5. if (tryReleaseShared(arg)) {
  6. doReleaseShared();
  7. return true;
  8. }
  9. return false;
  10. }

�doReleaseShared

唤醒阻塞队列中阻塞节点

  1. private void doReleaseShared() {
  2. for (;;) {
  3. Node h = head;
  4. if (h != null && h != tail) {
  5. int ws = h.waitStatus;
  6. //唤醒节点,需要将[前一个节点]waitstate,-1--->0
  7. if (ws == Node.SIGNAL) {
  8. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  9. continue; // loop to recheck cases
  10. unparkSuccessor(h);
  11. }
  12. else if (ws == 0 &&
  13. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  14. continue; // loop on failed CAS
  15. }
  16. if (h == head) // loop if head changed
  17. break;
  18. }
  19. }