1.什么是Latch
在本章中,我们将介绍Latch(门阀) 设计模式,该模式指定了一个屏障,只有所有的条件都达到满足的时候,门阀才能打开。
2.CountDownLatch程序实现
2.1 无限等待的Latch
在代码中, 首先定义了一个无限等待的抽象类Latch, 在Latch抽象类中定义了 await方法、countDown方法以及getUnarrived方法, 这些方法的用途在代码注释中都有详细介绍,当然在Latch中的limit属性至关重要,当limit降低到0时门阀将会被打开
public abstract class Latch {
// 用于控制多少个线程完成任务时才能打开阀门
protected int limit;
// 通过构造函数传入limit
public Latch(int limit) {
this.limit = limit;
}
// 该方法会使得当前线程一直等待,直到所有的线程都完成工作被阻塞的线程是允许被中断的
public abstract void await() throws InterruptedException;
// 当任务线程完成工作之后调用该方法使得计数器减一
public abstract void countDown();
// 获取当前还有多少个线程没有完成任务
public abstract int getUnarrived();
}
子任务数量达到limit的时候,门阀才能打开,await() 方法用于等待所有的子任务完成,如果到达数量未达到limit的时候,将会无限等待下去,当子任务完成的时候调用countDown() 方法使计数器减少一个,表明我已经完成任务了,getUnarrived() 方法主要用于查询当前有多少个子任务还未结束。
1.无限等待CountDownLatch实现
public class CountDownLatch extends Latch{
public CountDownLatch(int limit) {
super(limit);
}
@Override
public void await() throws InterruptedException {
synchronized (this) {
// 当limit > 0时,当前线程进入堵塞状态
while(limit > 0 ) {
this.wait();
}
}
}
@Override
public void countDown() {
synchronized (this) {
if ( limit <= 0 )
throw new IllegalStateException("all of task already arrived");
// 使limit减一,并且通知阻塞线程
limit--;
this.notifyAll();
}
}
@Override
public int getUnarrived() {
// 返回有多少线程还未完成任务
return limit;
}
}
在上述代码中, await() 方法不断判断limit的数量, 大于0时门阀将不能打开, 需要持续等待直到limit数量为0为止; countDown() 方法调用之后会导致limit—操作, 并且通知wait中的线程再次判断limit的值是否等于0, 当limit被减少到了0以下, 则抛出状态非法的异常; getUnarrived() 获取当前还有多少个子任务未完成, 这个返回值并不一定就是准确的, 在多线程的情况下, 某个线程在获得Unarrived任务数量并且返回之后, 有可能limit又被减少, 因此getUnarrived() 是一个评估值。
2.程序测试齐心协力打开门阀
/*
线程
*/
public class ProgrammerTravel extends Thread{
// 门阀
private final Latch latch;
// 程序
private final String programmer;
// 交通工具
private final String transportation;
// 构造函数
public ProgrammerTravel(Latch latch, String programmer, String transportation) {
this.latch = latch;
this.programmer = programmer;
this.transportation = transportation;
}
@Override
public void run() {
try {
// 花费路上的时间
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
}
}
TimeUnit.SECONDS.sleep(Thread Local Random.current() .next Int(10) ) 子句在run方法中模拟每个人到达目的地所花费的时间,当他们分别到达目的地的时候,需要执行latch.countDown(),使计数器减少一个以标明自己已到达, 代码如下:
public class LatchClient {
public static void main(String[] args) throws InterruptedException {
// 定义Latch, limit 为 4
Latch latch = new CountDownLatch(4);
new ProgrammerTravel(latch, "Alex", "Bus");
new ProgrammerTravel(latch, "Gavin","Walking");
latch.await();
System.out.println("all arrived");
}
}
2.2 有超时设置的Latch
1.可超时的等待
在Latch中增加可超时的抽象方法await(TimeUnit unit, long time) 的示例代码如下:
public abstract void await(TimeUnit unit, long time) throws InterruptedException;
其中TimeUnit代表wait的时间单位,而time则是指定数量的时间单位,在该方法中又增加了WaitTimeoutException用于通知当前的等待已经超时,与之相关的代码如所示。
public class WaitTImeoutException extends Exception{
public WaitTImeoutException(String message) {
super(message);
}
}
超时功能实现
@Override
public void await(TimeUnit unit, long time) throws InterruptedException, WaitTImeoutException {
if ( time <= 0 )
throw new IllegalArgumentException("the time is invalid");
long remainingNanos = unit.toNanos(time);
// 等待任务将在endNanos纳秒后超时
final long endNaos = System.nanoTime() + remainingNanos;
synchronized (this) {
while( limit > 0 ) {
// 如果超市则抛出WaitTimeoutException异常
if(TimeUnit.NANOSECONDS.toMillis(remainingNanos) <= 0 )
throw new WaitTImeoutException("the wait time over specify time.");
// 等待remainingNanos,在等待的过程中有可能会被中断,需要重新计算remainingNanos
this.wait(TimeUnit.NANOSECONDS.toMillis(remainingNanos));
remainingNanos = endNaos - System.nanoTime();
}
}
}
2.收到超时通知
public class LatchClient2 {
public static void main(String[] args) {
Latch latch = new CountDownLatch(2);
new ProgrammerTravel(latch, "Alex", "Bus").start();
new ProgrammerTravel(latch, "Gavin","Walking").start();
try {
latch.await(TimeUnit.SECONDS, 5);
System.out.println("all arrived");
} catch (InterruptedException | WaitTImeoutException e) {
e.printStackTrace();
}
}
}
3.扩展功能
Latch的作用是为了等待所有子任务完成后再执行其他任务, 因此可以对Latch进行再次的扩展,增加回调接口用于运行所有子任务完成后的其他任务,增加了回调功能的CountDownLatch代码如下:
public class CountDownLatch extends Latch{
public CountDownLatch(int limit) {
super(limit);
}
private Runnable runnable;
public CountDownLatch(int limit, Runnable runnable) {
this(limit);
this.runnable = runnable;
}
@Override
public void await() throws InterruptedException {
synchronized (this) {
// 当limit > 0时,当前线程进入堵塞状态
while(limit > 0 ) {
this.wait();
}
}
if ( null != null ) {
runnable.run();
}
}
@Override
public void await(TimeUnit unit, long time) throws InterruptedException, WaitTImeoutException {
if ( time <= 0 )
throw new IllegalArgumentException("the time is invalid");
long remainingNanos = unit.toNanos(time);
// 等待任务将在endNanos纳秒后超时
final long endNaos = System.nanoTime() + remainingNanos;
synchronized (this) {
while( limit > 0 ) {
// 如果超市则抛出WaitTimeoutException异常
if(TimeUnit.NANOSECONDS.toMillis(remainingNanos) <= 0 )
throw new WaitTImeoutException("the wait time over specify time.");
// 等待remainingNanos,在等待的过程中有可能会被中断,需要重新计算remainingNanos
this.wait(TimeUnit.NANOSECONDS.toMillis(remainingNanos));
remainingNanos = endNaos - System.nanoTime();
}
}
if ( null != runnable ) {
runnable.run();
}
}
@Override
public void countDown() {
synchronized (this) {
if ( limit <= 0 )
throw new IllegalStateException("all of task already arrived");
// 使limit减一,并且通知阻塞线程
limit--;
this.notifyAll();
}
}
@Override
public int getUnarrived() {
// 返回有多少线程还未完成任务
return limit;
}
}