一、 Fork-Join
forkjoin.jpg

1.Fork/Join使用的标准范式

我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork和join的操作机制,通常我们不直接继承ForkjoinTask类,只需要直接继承其子类。
1. RecursiveAction,用于没有返回结果的任务
2. RecursiveTask,用于有返回值的任务
task要通过ForkJoinPool来执行,使用submit 或 invoke 提交,两者的区别是:invoke是同步执行,调用之后需要等待任务完成,才能执行后面的代码;submit是异步执行。
join()和get方法当任务完成的时候返回计算结果。
forkjoin2.png

在我们自己实现的compute方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用invokeAll方法时,又会进入compute方法,看看当前子任务是否需要继续分割成孙任务,如果不需要继续分割,则执行当前子任务并返回结果。使用join方法会等待子任务执行完并得到其结果。

  1. public class ForkJoinSum {
  2. /**
  3. * RecursiveTask 有返回值 RecurSiveAction没有返回值 同步的使用invoke 异步的使用submit execute
  4. */
  5. public static class SumTask extends RecursiveTask<Integer>{
  6. /**
  7. * 阈值 当拆分到这个程度 不再进行拆分 进行计算
  8. */
  9. private final static int ThRESHOLD= MakeArray.ARRAY_LENGTH/10;
  10. private int[] src;
  11. private int fromIndex;
  12. private int toIndex;
  13. public SumTask(int[] src, int fromIndex, int toIndex) {
  14. this.src = src;
  15. this.fromIndex = fromIndex;
  16. this.toIndex = toIndex;
  17. }
  18. @Override
  19. protected Integer compute() {
  20. /**
  21. * 判定任务大小是否到合适范围 开始到结束的防卫是否小于阈值 是 进行计算
  22. */
  23. if (toIndex-fromIndex<ThRESHOLD){
  24. System.out.println("fromIndex="+fromIndex+"-----toIndex="+toIndex);
  25. int count=0;
  26. for (int i=fromIndex;i<=toIndex;i++){
  27. try {
  28. Thread.sleep(1);
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. count=count+src[i];
  33. }
  34. return count;
  35. }else {
  36. //如果大于阈值 则进行折半拆分
  37. int mid=(fromIndex+toIndex)/2;
  38. SumTask left=new SumTask(src,fromIndex,mid);
  39. SumTask right=new SumTask(src,mid+1,toIndex);
  40. invokeAll(left,right);
  41. return left.join()+right.join();
  42. }
  43. }
  44. }
  45. public static void main(String[] args){
  46. int[] src=MakeArray.makeArray();
  47. /**
  48. * new出池的实例
  49. */
  50. ForkJoinPool forkJoinPool=new ForkJoinPool();
  51. /**
  52. * new 出Task的实例
  53. */
  54. SumTask innerFind=new SumTask(src,0,src.length-1);
  55. long start=System.currentTimeMillis();
  56. forkJoinPool.invoke(innerFind);
  57. System.out.println(innerFind.join());
  58. System.out.println(System.currentTimeMillis()-start);
  59. }
  60. }

二、 CountDownLatch

闭锁,CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。
CountDownLatch是通过一个计数器来实现的,计数器的初始值为初始任务的数量。每当完成了一个任务后,计数器的值就会减1(CountDownLatch.countDown()方法)。当计数器值到达0时,它表示所有的已经完成了任务,然后在闭锁上等待CountDownLatch.await()方法的线程就可以恢复执行任务。
应用场景:
实现最大的并行性:有时我们想同时启动多个线程,实现最大程度的并行性。例如,我们想测试一个单例类。如果我们创建一个初始计数为1的CountDownLatch,并让所有线程都在这个锁上等待,那么我们可以很轻松地完成测试。我们只需调用 一次countDown()方法就可以让所有的等待线程同时恢复执行。
开始执行前等待n个线程完成各自任务:例如应用程序启动类要确保在处理用户请求前,所有N个外部系统已经启动和运行了,例如处理excel中多个表单。
countDownLatch.png

/**
 * 共5个初始化子线程,6个闭锁扣除点,扣除完毕后主线程和业务子线程
 */
public class CountDownLatchTest {

    static CountDownLatch latch=new CountDownLatch(6);

    public static class InitThread implements Runnable{
        @Override
        public void run() {
            System.out.println("Thread_"+Thread.currentThread().getId()+"准备初始化工作");
            for (int i=0;i<2;i++){
                System.out.println(Thread.currentThread().getId()+"初始化"+i);
                latch.countDown();
            }
        }
    }

    /**
     * 业务线程  等待countDownLatch 未0
     */
    private static class BusiThread implements Runnable{

        @Override
        public void run() {
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for (int i=0;i<3;i++){
                System.out.println("Thread_"+Thread.currentThread().getId()+"dobussiness");
            }
        }
    }
    public static void main(String[] args){
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("Thread_"+Thread.currentThread().getId()+"加载1");
                latch.countDown();

                System.out.println("Thread_"+Thread.currentThread().getId()+"加载2");
                latch.countDown();
            }
        }).start();
        for (int i=0;i<3;i++){
            new Thread(new InitThread()).start();
        }
        try {
            latch.await();
            System.out.println("main 工作");

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(new BusiThread()).start();
    }
}

三、CyclicBarrier

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。
CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。
CylicBarrier.png

CountDownLatch和CyclicBarrier辨析

CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以反复使用。
CountDownLatch.await一般阻塞工作线程,所有的进行预备工作的线程执行countDown,而CyclicBarrier通过工作线程调用await从而自行阻塞,直到所有工作线程达到指定屏障,再大家一起往下走。
在控制多个线程同时运行上,CountDownLatch可以不限线程数量,而CyclicBarrier是固定线程数。
同时,CyclicBarrier还可以提供一个barrierAction,合并多线程计算结果。

四、Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。应用场景Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用Semaphore来做流量控制。。Semaphore的构造方法Semaphore(int permits)接受一个整型的数字,表示可用的许可证数量。Semaphore的用法也很简单,首先线程使用Semaphore的acquire()方法获取一个许可证,使用完之后调用release()方法归还许可证。还可以用tryAcquire()方法尝试获取许可证。
Semaphore还提供一些其他方法,具体如下。
•intavailablePermits():返回此信号量中当前可用的许可证数。
•intgetQueueLength():返回正在等待获取许可证的线程数。
•booleanhasQueuedThreads():是否有线程正在等待获取许可证。
•void reducePermits(int reduction):减少reduction个许可证,是个protected方法。
•Collection getQueuedThreads():返回所有等待获取许可证的线程集合,是个protected方法。
sem.png

五、Callable、Future和FutureTask

Runnable是一个接口,在它里面只声明了一个run()方法,由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。
Callable位于java.util.concurrent包下,它也是一个接口,在它里面也只声明了一个方法,只不过这个方法叫做call(),这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。
Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

future.png
因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。
futureTask.png
FutureTask类实现了RunnableFuture接口,RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
guanxi.jpg

public class UseFuture {

    /**
     * 实现callable接口,允许有返回值
     */
    private static class UseCallable implements Callable<Integer>{

        private int sum;
        @Override
        public Integer call() throws Exception {

            System.out.println("Callable子线程开始计算");
            Thread.sleep(2000);
            for (int i=0;i<5000;i++){
                if (Thread.currentThread().isInterrupted()){
                    System.out.println("Callable子线程计算任务中断");
                }
                sum=sum+i;
            }
            System.out.println("Callable子线程计算结束"+sum);
            return sum;
        }
    }

    public static void main(String[] args) throws Exception{
        UseCallable useCallable=new UseCallable();
        FutureTask<Integer> futureTask=new FutureTask<>(useCallable);
        Random random=new Random();
        new Thread(futureTask).start();
        if (random.nextInt(10)>50){
            System.out.println("reuslt="+futureTask.get());
        }else {
            System.out.println("zhongzhi");
            futureTask.cancel(true);
        }
        try {
            futureTask.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }


    }