1.什么是Latch

在本章中,我们将介绍Latch(门阀) 设计模式,该模式指定了一个屏障,只有所有的条件都达到满足的时候,门阀才能打开。

2.CountDownLatch程序实现

2.1 无限等待的Latch

在代码中, 首先定义了一个无限等待的抽象类Latch, 在Latch抽象类中定义了 await方法、countDown方法以及getUnarrived方法, 这些方法的用途在代码注释中都有详细介绍,当然在Latch中的limit属性至关重要,当limit降低到0时门阀将会被打开

  1. public abstract class Latch {
  2. // 用于控制多少个线程完成任务时才能打开阀门
  3. protected int limit;
  4. // 通过构造函数传入limit
  5. public Latch(int limit) {
  6. this.limit = limit;
  7. }
  8. // 该方法会使得当前线程一直等待,直到所有的线程都完成工作被阻塞的线程是允许被中断的
  9. public abstract void await() throws InterruptedException;
  10. // 当任务线程完成工作之后调用该方法使得计数器减一
  11. public abstract void countDown();
  12. // 获取当前还有多少个线程没有完成任务
  13. public abstract int getUnarrived();
  14. }

子任务数量达到limit的时候,门阀才能打开,await() 方法用于等待所有的子任务完成,如果到达数量未达到limit的时候,将会无限等待下去,当子任务完成的时候调用countDown() 方法使计数器减少一个,表明我已经完成任务了,getUnarrived() 方法主要用于查询当前有多少个子任务还未结束。

1.无限等待CountDownLatch实现

  1. public class CountDownLatch extends Latch{
  2. public CountDownLatch(int limit) {
  3. super(limit);
  4. }
  5. @Override
  6. public void await() throws InterruptedException {
  7. synchronized (this) {
  8. // 当limit > 0时,当前线程进入堵塞状态
  9. while(limit > 0 ) {
  10. this.wait();
  11. }
  12. }
  13. }
  14. @Override
  15. public void countDown() {
  16. synchronized (this) {
  17. if ( limit <= 0 )
  18. throw new IllegalStateException("all of task already arrived");
  19. // 使limit减一,并且通知阻塞线程
  20. limit--;
  21. this.notifyAll();
  22. }
  23. }
  24. @Override
  25. public int getUnarrived() {
  26. // 返回有多少线程还未完成任务
  27. return limit;
  28. }
  29. }

在上述代码中, await() 方法不断判断limit的数量, 大于0时门阀将不能打开, 需要持续等待直到limit数量为0为止; countDown() 方法调用之后会导致limit—操作, 并且通知wait中的线程再次判断limit的值是否等于0, 当limit被减少到了0以下, 则抛出状态非法的异常; getUnarrived() 获取当前还有多少个子任务未完成, 这个返回值并不一定就是准确的, 在多线程的情况下, 某个线程在获得Unarrived任务数量并且返回之后, 有可能limit又被减少, 因此getUnarrived() 是一个评估值。

2.程序测试齐心协力打开门阀

  1. /*
  2. 线程
  3. */
  4. public class ProgrammerTravel extends Thread{
  5. // 门阀
  6. private final Latch latch;
  7. // 程序
  8. private final String programmer;
  9. // 交通工具
  10. private final String transportation;
  11. // 构造函数
  12. public ProgrammerTravel(Latch latch, String programmer, String transportation) {
  13. this.latch = latch;
  14. this.programmer = programmer;
  15. this.transportation = transportation;
  16. }
  17. @Override
  18. public void run() {
  19. try {
  20. // 花费路上的时间
  21. TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. latch.countDown();
  26. }
  27. }

TimeUnit.SECONDS.sleep(Thread Local Random.current() .next Int(10) ) 子句在run方法中模拟每个人到达目的地所花费的时间,当他们分别到达目的地的时候,需要执行latch.countDown(),使计数器减少一个以标明自己已到达, 代码如下:

  1. public class LatchClient {
  2. public static void main(String[] args) throws InterruptedException {
  3. // 定义Latch, limit 为 4
  4. Latch latch = new CountDownLatch(4);
  5. new ProgrammerTravel(latch, "Alex", "Bus");
  6. new ProgrammerTravel(latch, "Gavin","Walking");
  7. latch.await();
  8. System.out.println("all arrived");
  9. }
  10. }

2.2 有超时设置的Latch

1.可超时的等待

在Latch中增加可超时的抽象方法await(TimeUnit unit, long time) 的示例代码如下:

  1. public abstract void await(TimeUnit unit, long time) throws InterruptedException;

其中TimeUnit代表wait的时间单位,而time则是指定数量的时间单位,在该方法中又增加了WaitTimeoutException用于通知当前的等待已经超时,与之相关的代码如所示。

  1. public class WaitTImeoutException extends Exception{
  2. public WaitTImeoutException(String message) {
  3. super(message);
  4. }
  5. }

超时功能实现

  1. @Override
  2. public void await(TimeUnit unit, long time) throws InterruptedException, WaitTImeoutException {
  3. if ( time <= 0 )
  4. throw new IllegalArgumentException("the time is invalid");
  5. long remainingNanos = unit.toNanos(time);
  6. // 等待任务将在endNanos纳秒后超时
  7. final long endNaos = System.nanoTime() + remainingNanos;
  8. synchronized (this) {
  9. while( limit > 0 ) {
  10. // 如果超市则抛出WaitTimeoutException异常
  11. if(TimeUnit.NANOSECONDS.toMillis(remainingNanos) <= 0 )
  12. throw new WaitTImeoutException("the wait time over specify time.");
  13. // 等待remainingNanos,在等待的过程中有可能会被中断,需要重新计算remainingNanos
  14. this.wait(TimeUnit.NANOSECONDS.toMillis(remainingNanos));
  15. remainingNanos = endNaos - System.nanoTime();
  16. }
  17. }
  18. }

2.收到超时通知

  1. public class LatchClient2 {
  2. public static void main(String[] args) {
  3. Latch latch = new CountDownLatch(2);
  4. new ProgrammerTravel(latch, "Alex", "Bus").start();
  5. new ProgrammerTravel(latch, "Gavin","Walking").start();
  6. try {
  7. latch.await(TimeUnit.SECONDS, 5);
  8. System.out.println("all arrived");
  9. } catch (InterruptedException | WaitTImeoutException e) {
  10. e.printStackTrace();
  11. }
  12. }
  13. }

3.扩展功能
Latch的作用是为了等待所有子任务完成后再执行其他任务, 因此可以对Latch进行再次的扩展,增加回调接口用于运行所有子任务完成后的其他任务,增加了回调功能的CountDownLatch代码如下:

  1. public class CountDownLatch extends Latch{
  2. public CountDownLatch(int limit) {
  3. super(limit);
  4. }
  5. private Runnable runnable;
  6. public CountDownLatch(int limit, Runnable runnable) {
  7. this(limit);
  8. this.runnable = runnable;
  9. }
  10. @Override
  11. public void await() throws InterruptedException {
  12. synchronized (this) {
  13. // 当limit > 0时,当前线程进入堵塞状态
  14. while(limit > 0 ) {
  15. this.wait();
  16. }
  17. }
  18. if ( null != null ) {
  19. runnable.run();
  20. }
  21. }
  22. @Override
  23. public void await(TimeUnit unit, long time) throws InterruptedException, WaitTImeoutException {
  24. if ( time <= 0 )
  25. throw new IllegalArgumentException("the time is invalid");
  26. long remainingNanos = unit.toNanos(time);
  27. // 等待任务将在endNanos纳秒后超时
  28. final long endNaos = System.nanoTime() + remainingNanos;
  29. synchronized (this) {
  30. while( limit > 0 ) {
  31. // 如果超市则抛出WaitTimeoutException异常
  32. if(TimeUnit.NANOSECONDS.toMillis(remainingNanos) <= 0 )
  33. throw new WaitTImeoutException("the wait time over specify time.");
  34. // 等待remainingNanos,在等待的过程中有可能会被中断,需要重新计算remainingNanos
  35. this.wait(TimeUnit.NANOSECONDS.toMillis(remainingNanos));
  36. remainingNanos = endNaos - System.nanoTime();
  37. }
  38. }
  39. if ( null != runnable ) {
  40. runnable.run();
  41. }
  42. }
  43. @Override
  44. public void countDown() {
  45. synchronized (this) {
  46. if ( limit <= 0 )
  47. throw new IllegalStateException("all of task already arrived");
  48. // 使limit减一,并且通知阻塞线程
  49. limit--;
  50. this.notifyAll();
  51. }
  52. }
  53. @Override
  54. public int getUnarrived() {
  55. // 返回有多少线程还未完成任务
  56. return limit;
  57. }
  58. }