闭锁
闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态。
闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束时,这扇门会打开并允许所有的线程通过。
闭锁可以用来确保某些活动直到其他活动都完成后才继续执行,比如:
- 确保某个计算在其需要的所有资源都被初始化之后才继续执行。
- 确保某个服务在其依赖的所有其他服务都已经启动之后才启动。
-
CountDownLatch实现原理
CountDownLatch 是一种灵活的闭锁实现。
闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown 方法递减计数器,表示有一个事件已经发生了,而 await 方法等待计数器达到零,这表示所有需要等待的事件都已经发生。
如果计数器的值非零,那么 await 会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。
代码实现:public class Test {public static void main(String[] args) {final CountDownLatch latch = new CountDownLatch(2);new Thread(){public void run() {try {System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");Thread.sleep(3000);System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");latch.countDown();} catch (InterruptedException e) {e.printStackTrace();}};}.start();new Thread(){public void run() {try {System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");Thread.sleep(3000);System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");latch.countDown();} catch (InterruptedException e) {e.printStackTrace();}};}.start();try {System.out.println("等待2个子线程执行完毕...");latch.await();System.out.println("2个子线程已经执行完毕");System.out.println("继续执行主线程");} catch (InterruptedException e) {e.printStackTrace();}}}
执行结果:
线程Thread-0正在执行线程Thread-1正在执行等待2个子线程执行完毕...线程Thread-0执行完毕线程Thread-1执行完毕2个子线程已经执行完毕继续执行主线程
FutureTask
FutureTask 也可以用作闭锁。
FutureTask 表示的计算是通过 Callable 来实现的,相当于一种可生成结果的 Runnable,并且可以用处于以下3种状态:等待运行(Waiting to run),正在运行(Running)和运行完成(Completed)。
Future.get 的行为取决于任务的状态。如果任务已经完成,那么get会立即返回结果,否则将阻塞直到任务进入完成状态,然后返回结果或者抛出异常。
代码实现:public class MyCallable implements Callable<String> {private long waitTime;public MyCallable(int timeInMillis){this.waitTime=timeInMillis;}@Overridepublic String call() throws Exception {Thread.sleep(waitTime);//return the thread name executing this callable taskreturn Thread.currentThread().getName();}}
public class FutureTaskExample {public static void main(String[] args) {MyCallable callable1 = new MyCallable(1000);MyCallable callable2 = new MyCallable(2000);FutureTask<String> futureTask1 = new FutureTask<String>(callable1);FutureTask<String> futureTask2 = new FutureTask<String>(callable2);ExecutorService executor = Executors.newFixedThreadPool(2);executor.execute(futureTask1);executor.execute(futureTask2);while (true) {try {if(futureTask1.isDone() && futureTask2.isDone()){System.out.println("Done");//shut down executor serviceexecutor.shutdown();return;}if(!futureTask1.isDone()){//wait indefinitely for future task to completeSystem.out.println("FutureTask1 output="+futureTask1.get());}System.out.println("Waiting for FutureTask2 to complete");String s = futureTask2.get(200L, TimeUnit.MILLISECONDS);if(s !=null){System.out.println("FutureTask2 output="+s);}} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}catch(TimeoutException e){//do nothing}}}}
运行结果:
FutureTask1 output=pool-1-thread-1Waiting for FutureTask2 to completeWaiting for FutureTask2 to completeWaiting for FutureTask2 to completeWaiting for FutureTask2 to completeWaiting for FutureTask2 to completeFutureTask2 output=pool-1-thread-2Done
信号量
计数信号量(
Counting Semaphore)控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。
计数信号量还可以用来实现某种资源池,或者对容器施加边界。
Semaphore 中管理着一组虚拟的许可(permit),许可的初始数量可通过构造函数来指定。在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,那么 acquire 将阻塞直到有许可(或者直到被中断或者操作超时)。release 方法将返回一个许可给信号量。
代码实现:public class Test {public static void main(String[] args) {int N = 8; //工人数Semaphore semaphore = new Semaphore(5); //机器数目for(int i=0;i<N;i++)new Worker(i,semaphore).start();}static class Worker extends Thread{private int num;private Semaphore semaphore;public Worker(int num,Semaphore semaphore){this.num = num;this.semaphore = semaphore;}@Overridepublic void run() {try {semaphore.acquire();System.out.println("工人"+this.num+"占用一个机器在生产...");Thread.sleep(2000);System.out.println("工人"+this.num+"释放出机器");semaphore.release();} catch (InterruptedException e) {e.printStackTrace();}}}}
结果:
工人0占用一个机器在生产...工人1占用一个机器在生产...工人2占用一个机器在生产...工人4占用一个机器在生产...工人5占用一个机器在生产...工人0释放出机器工人2释放出机器工人3占用一个机器在生产...工人7占用一个机器在生产...工人4释放出机器工人5释放出机器工人1释放出机器工人6占用一个机器在生产...工人3释放出机器工人7释放出机器工人6释放出机器
栅栏
闭锁是一次性对象,一旦进入终止状态,就不能被重置。
栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。
闭锁用于等待事件,而栅栏用于等待其他线程。
当线程到达栅栏位置时将调用 await 方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达了栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏将被重置以便下次使用。比如旅行团出行,6点所有人集合于麦当劳门口,7点清点人数,准备发车。这里就需要等到所有人都到齐之后再进行下一步的讨论。
如果对 await 的调用超时,或者 await 阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的 await 调用都将终止并抛出 BrokenBarrierException。
如果成功地通过栅栏,那么 await 将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来“选举”产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。
CyclicBarrier 还可以使你将一个栅栏操作传递给构造函数,这是一个 Runnable,当成功通过栅栏时会(在一个子任务线程中)执行它,但在阻塞线程被释放之前是不能执行的。
代码实现:
public class Test {public static void main(String[] args) {int N = 4;CyclicBarrier barrier = new CyclicBarrier(N,new Runnable() {@Overridepublic void run() {System.out.println("当前线程"+Thread.currentThread().getName());}});for(int i=0;i<N;i++)new Writer(barrier).start();}static class Writer extends Thread{private CyclicBarrier cyclicBarrier;public Writer(CyclicBarrier cyclicBarrier) {this.cyclicBarrier = cyclicBarrier;}@Overridepublic void run() {System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");try {Thread.sleep(5000); //以睡眠来模拟写入数据操作System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();}catch(BrokenBarrierException e){e.printStackTrace();}System.out.println("所有线程写入完毕,继续处理其他任务...");}}}
结果:
线程Thread-0正在写入数据...线程Thread-1正在写入数据...线程Thread-2正在写入数据...线程Thread-3正在写入数据...线程Thread-0写入数据完毕,等待其他线程写入完毕线程Thread-1写入数据完毕,等待其他线程写入完毕线程Thread-2写入数据完毕,等待其他线程写入完毕线程Thread-3写入数据完毕,等待其他线程写入完毕当前线程Thread-3所有线程写入完毕,继续处理其他任务...所有线程写入完毕,继续处理其他任务...所有线程写入完毕,继续处理其他任务...所有线程写入完毕,继续处理其他任务...
从结果可以看出,当四个线程都到达barrier状态后,会从四个线程中选择一个线程去执行Runnable。
CompletableFuture
使用 supplyAsync() 运行一个异步任务并且返回结果。
当任务不需要返回任何东西的时候, CompletableFuture.runAsync() 非常有用。但是如果你的后台任务需要返回一些结果应该要怎么样?CompletableFuture.supplyAsync() 就是你的选择。它持有 supplier<T> 并且返回 CompletableFuture<T>,T 是通过调用 传入的supplier取得的值的类型。
代码实现:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new IllegalStateException(e);}return "Result of the asynchronous computation";});// Block and get the result of the FutureString result = future.get();System.out.println(result);
参考资料
《Java并发编程实战》
Java并发编程:CountDownLatch、CyclicBarrier和Semaphore
Java FutureTask Example Program
Java 8 CompletableFuture 教程
