闭锁

闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态。
闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束时,这扇门会打开并允许所有的线程通过。
闭锁可以用来确保某些活动直到其他活动都完成后才继续执行,比如:

  1. 确保某个计算在其需要的所有资源都被初始化之后才继续执行。
  2. 确保某个服务在其依赖的所有其他服务都已经启动之后才启动。
  3. 等待直到某个操作的所有参与者都就绪再继续执行。

    CountDownLatch实现原理

    CountDownLatch 是一种灵活的闭锁实现。
    闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown 方法递减计数器,表示有一个事件已经发生了,而 await 方法等待计数器达到零,这表示所有需要等待的事件都已经发生。
    如果计数器的值非零,那么 await 会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。
    代码实现:

    1. public class Test {
    2. public static void main(String[] args) {
    3. final CountDownLatch latch = new CountDownLatch(2);
    4. new Thread(){
    5. public void run() {
    6. try {
    7. System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
    8. Thread.sleep(3000);
    9. System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
    10. latch.countDown();
    11. } catch (InterruptedException e) {
    12. e.printStackTrace();
    13. }
    14. };
    15. }.start();
    16. new Thread(){
    17. public void run() {
    18. try {
    19. System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
    20. Thread.sleep(3000);
    21. System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
    22. latch.countDown();
    23. } catch (InterruptedException e) {
    24. e.printStackTrace();
    25. }
    26. };
    27. }.start();
    28. try {
    29. System.out.println("等待2个子线程执行完毕...");
    30. latch.await();
    31. System.out.println("2个子线程已经执行完毕");
    32. System.out.println("继续执行主线程");
    33. } catch (InterruptedException e) {
    34. e.printStackTrace();
    35. }
    36. }
    37. }

    执行结果:

    1. 线程Thread-0正在执行
    2. 线程Thread-1正在执行
    3. 等待2个子线程执行完毕...
    4. 线程Thread-0执行完毕
    5. 线程Thread-1执行完毕
    6. 2个子线程已经执行完毕
    7. 继续执行主线程

    FutureTask

    FutureTask 也可以用作闭锁。
    FutureTask 表示的计算是通过 Callable 来实现的,相当于一种可生成结果的 Runnable,并且可以用处于以下3种状态:等待运行(Waiting to run),正在运行(Running)和运行完成(Completed)。
    Future.get 的行为取决于任务的状态。如果任务已经完成,那么get会立即返回结果,否则将阻塞直到任务进入完成状态,然后返回结果或者抛出异常。
    代码实现:

    1. public class MyCallable implements Callable<String> {
    2. private long waitTime;
    3. public MyCallable(int timeInMillis){
    4. this.waitTime=timeInMillis;
    5. }
    6. @Override
    7. public String call() throws Exception {
    8. Thread.sleep(waitTime);
    9. //return the thread name executing this callable task
    10. return Thread.currentThread().getName();
    11. }
    12. }
    1. public class FutureTaskExample {
    2. public static void main(String[] args) {
    3. MyCallable callable1 = new MyCallable(1000);
    4. MyCallable callable2 = new MyCallable(2000);
    5. FutureTask<String> futureTask1 = new FutureTask<String>(callable1);
    6. FutureTask<String> futureTask2 = new FutureTask<String>(callable2);
    7. ExecutorService executor = Executors.newFixedThreadPool(2);
    8. executor.execute(futureTask1);
    9. executor.execute(futureTask2);
    10. while (true) {
    11. try {
    12. if(futureTask1.isDone() && futureTask2.isDone()){
    13. System.out.println("Done");
    14. //shut down executor service
    15. executor.shutdown();
    16. return;
    17. }
    18. if(!futureTask1.isDone()){
    19. //wait indefinitely for future task to complete
    20. System.out.println("FutureTask1 output="+futureTask1.get());
    21. }
    22. System.out.println("Waiting for FutureTask2 to complete");
    23. String s = futureTask2.get(200L, TimeUnit.MILLISECONDS);
    24. if(s !=null){
    25. System.out.println("FutureTask2 output="+s);
    26. }
    27. } catch (InterruptedException | ExecutionException e) {
    28. e.printStackTrace();
    29. }catch(TimeoutException e){
    30. //do nothing
    31. }
    32. }
    33. }
    34. }

    运行结果:

    1. FutureTask1 output=pool-1-thread-1
    2. Waiting for FutureTask2 to complete
    3. Waiting for FutureTask2 to complete
    4. Waiting for FutureTask2 to complete
    5. Waiting for FutureTask2 to complete
    6. Waiting for FutureTask2 to complete
    7. FutureTask2 output=pool-1-thread-2
    8. Done

    信号量

    计数信号量(Counting Semaphore)控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。
    计数信号量还可以用来实现某种资源池,或者对容器施加边界。
    Semaphore 中管理着一组虚拟的许可(permit),许可的初始数量可通过构造函数来指定。在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,那么 acquire 将阻塞直到有许可(或者直到被中断或者操作超时)。release 方法将返回一个许可给信号量。
    代码实现:

    1. public class Test {
    2. public static void main(String[] args) {
    3. int N = 8; //工人数
    4. Semaphore semaphore = new Semaphore(5); //机器数目
    5. for(int i=0;i<N;i++)
    6. new Worker(i,semaphore).start();
    7. }
    8. static class Worker extends Thread{
    9. private int num;
    10. private Semaphore semaphore;
    11. public Worker(int num,Semaphore semaphore){
    12. this.num = num;
    13. this.semaphore = semaphore;
    14. }
    15. @Override
    16. public void run() {
    17. try {
    18. semaphore.acquire();
    19. System.out.println("工人"+this.num+"占用一个机器在生产...");
    20. Thread.sleep(2000);
    21. System.out.println("工人"+this.num+"释放出机器");
    22. semaphore.release();
    23. } catch (InterruptedException e) {
    24. e.printStackTrace();
    25. }
    26. }
    27. }
    28. }

    结果:

    1. 工人0占用一个机器在生产...
    2. 工人1占用一个机器在生产...
    3. 工人2占用一个机器在生产...
    4. 工人4占用一个机器在生产...
    5. 工人5占用一个机器在生产...
    6. 工人0释放出机器
    7. 工人2释放出机器
    8. 工人3占用一个机器在生产...
    9. 工人7占用一个机器在生产...
    10. 工人4释放出机器
    11. 工人5释放出机器
    12. 工人1释放出机器
    13. 工人6占用一个机器在生产...
    14. 工人3释放出机器
    15. 工人7释放出机器
    16. 工人6释放出机器

    栅栏

    闭锁是一次性对象,一旦进入终止状态,就不能被重置。
    栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。
    闭锁用于等待事件,而栅栏用于等待其他线程。
    当线程到达栅栏位置时将调用 await 方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达了栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏将被重置以便下次使用。

    比如旅行团出行,6点所有人集合于麦当劳门口,7点清点人数,准备发车。这里就需要等到所有人都到齐之后再进行下一步的讨论。

如果对 await 的调用超时,或者 await 阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的 await 调用都将终止并抛出 BrokenBarrierException
如果成功地通过栅栏,那么 await 将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来“选举”产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。
CyclicBarrier 还可以使你将一个栅栏操作传递给构造函数,这是一个 Runnable,当成功通过栅栏时会(在一个子任务线程中)执行它,但在阻塞线程被释放之前是不能执行的。
代码实现:

  1. public class Test {
  2. public static void main(String[] args) {
  3. int N = 4;
  4. CyclicBarrier barrier = new CyclicBarrier(N,new Runnable() {
  5. @Override
  6. public void run() {
  7. System.out.println("当前线程"+Thread.currentThread().getName());
  8. }
  9. });
  10. for(int i=0;i<N;i++)
  11. new Writer(barrier).start();
  12. }
  13. static class Writer extends Thread{
  14. private CyclicBarrier cyclicBarrier;
  15. public Writer(CyclicBarrier cyclicBarrier) {
  16. this.cyclicBarrier = cyclicBarrier;
  17. }
  18. @Override
  19. public void run() {
  20. System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
  21. try {
  22. Thread.sleep(5000); //以睡眠来模拟写入数据操作
  23. System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
  24. cyclicBarrier.await();
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. }catch(BrokenBarrierException e){
  28. e.printStackTrace();
  29. }
  30. System.out.println("所有线程写入完毕,继续处理其他任务...");
  31. }
  32. }
  33. }

结果:

  1. 线程Thread-0正在写入数据...
  2. 线程Thread-1正在写入数据...
  3. 线程Thread-2正在写入数据...
  4. 线程Thread-3正在写入数据...
  5. 线程Thread-0写入数据完毕,等待其他线程写入完毕
  6. 线程Thread-1写入数据完毕,等待其他线程写入完毕
  7. 线程Thread-2写入数据完毕,等待其他线程写入完毕
  8. 线程Thread-3写入数据完毕,等待其他线程写入完毕
  9. 当前线程Thread-3
  10. 所有线程写入完毕,继续处理其他任务...
  11. 所有线程写入完毕,继续处理其他任务...
  12. 所有线程写入完毕,继续处理其他任务...
  13. 所有线程写入完毕,继续处理其他任务...

从结果可以看出,当四个线程都到达barrier状态后,会从四个线程中选择一个线程去执行Runnable。

CompletableFuture

使用 supplyAsync() 运行一个异步任务并且返回结果。
当任务不需要返回任何东西的时候, CompletableFuture.runAsync() 非常有用。但是如果你的后台任务需要返回一些结果应该要怎么样?
CompletableFuture.supplyAsync() 就是你的选择。它持有 supplier<T> 并且返回 CompletableFuture<T>,T 是通过调用 传入的supplier取得的值的类型。
代码实现:

  1. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  2. try {
  3. TimeUnit.SECONDS.sleep(1);
  4. } catch (InterruptedException e) {
  5. throw new IllegalStateException(e);
  6. }
  7. return "Result of the asynchronous computation";
  8. });
  9. // Block and get the result of the Future
  10. String result = future.get();
  11. System.out.println(result);

参考资料

《Java并发编程实战》
Java并发编程:CountDownLatch、CyclicBarrier和Semaphore
Java FutureTask Example Program
Java 8 CompletableFuture 教程