1. 线程池创建

线程池创建有三种:

  1. 自定义创建线程池
  2. 如果使用spring,可以使用spring自带的线程池
  3. 可以使用Executors框架体系(不推介使用,because 它使用的都是无界限队列,容易发生OOM)

1.1 自定义创建线程池

  1. @Bean
  2. public ThreadPoolExecutor echartExecutor() {
  3. log.info("ThreadPoolExecutor echartExecutor init ... ");
  4. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(9, 27, 30, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10000),
  5. Executors.defaultThreadFactory(), new AbortPolicy());
  6. threadPoolExecutor.prestartAllCoreThreads(); // 预启动所有核心线程
  7. log.info("ThreadPoolExecutor echartExecutor init completed !!! ");
  8. return threadPoolExecutor;

线程创建工厂也可以自己定义

  1. static class NameTreadFactory implements ThreadFactory {
  2. private final AtomicInteger mThreadNum = new AtomicInteger(1);
  3. @Override
  4. public Thread newThread(Runnable r) {
  5. Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
  6. System.out.println(t.getName() + " has been created");
  7. return t;
  8. }
  9. }

拒绝执行处理器也可以自己定义

  1. public static class MyIgnorePolicy implements RejectedExecutionHandler {
  2. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  3. doLog(r, e);
  4. }
  5. private void doLog(Runnable r, ThreadPoolExecutor e) {
  6. // 可做日志记录等
  7. System.err.println( r.toString() + " rejected");
  8. // System.out.println("completedTaskCount: " + e.getCompletedTaskCount());
  9. }
  10. }
  • corePoolSize:表示线程池保有的最小线程数。有些项目很闲,但是也不能把人都撤了,至少要留 corePoolSize 个人坚守阵地。
  • maximumPoolSize:表示线程池创建的最大线程数。当项目很忙时,就需要加人,但是也不能无限制地加,最多就加到 maximumPoolSize 个人。当项目闲下来时,就要撤人了,最多能撤到 corePoolSize 个人。
  • keepAliveTime & unit:上面提到项目根据忙闲来增减人员,那在编程世界里,如何定义忙和闲呢?很简单,一个线程如果在一段时间内,都没有执行任务,说明很闲,keepAliveTime 和 unit 就是用来定义这个“一段时间”的参数。也就是说,如果一个线程空闲了keepAliveTime & unit这么久,而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收了。
  • workQueue:工作队列,和上面示例代码的工作队列同义。
  • threadFactory:通过这个参数你可以自定义如何创建线程,例如你可以给线程指定一个有意义的名字。
  • handler:通过这个参数你可以自定义任务的拒绝策略。如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。至于拒绝的策略,你可以通过 handler 这个参数来指定。ThreadPoolExecutor 已经提供了以下 4 种策略。
    • CallerRunsPolicy:提交任务的线程自己去执行该任务。
    • AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException。
    • DiscardPolicy:直接丢弃任务,没有任何异常抛出。
    • DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。

Java 在 1.6 版本还增加了 allowCoreThreadTimeOut(boolean value) 方法,它可以让所有线程都支持超时,这意味着如果项目很闲,就会将项目组的成员都撤走。


1.2 spring自己的线程池

  1. @Bean(name = "threadPoolTaskExecutor")
  2. public ThreadPoolTaskExecutor getAsyncThreadPoolTaskExecutor() {
  3. ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
  4. threadPool.setCorePoolSize(10);
  5. threadPool.setMaxPoolSize(60);
  6. threadPool.setKeepAliveSeconds(60);
  7. threadPool.setQueueCapacity(10000);
  8. threadPool.setWaitForTasksToCompleteOnShutdown(true);
  9. threadPool.setAwaitTerminationSeconds(60);
  10. threadPool.initialize();
  11. return threadPool;
  12. }

1.3 Executors工具框架体系创建线程池

  1. //创建固定线程数的线程池
  2. ExecutorService pool = Executors.newFixedThreadPool(taskSize);
  3. //创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)
  4. ExecutorService pool = new CachedThreadPool()
  5. //创建一个单线程化的Executor
  6. ExecutorService pool = new SingleThreadExecutor()
  7. //创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
  8. ExecutorService pool = new ScheduledThreadPool(int corePoolSize)

2. 使用线程池的注意事项

  1. 强烈建议使用有界队列,这也是不推荐使用Executors的原因(Executors使用的都是无界队列容易发生OOM)
  2. 使用有界队列,任务过多时候,容易发生拒绝策略,默认的拒绝策略是:throw RuntimeException. 默认拒绝策略要慎重使用,建议自定义自己的拒绝策略;
    并且在实际工作中,自定义的拒绝策略往往和降级策略配合使用。

3. 获取线程返回结果

3.1 execute(Runnable r)方法没有返回值

ThreadPoolExecutorvoid execute(Runnable command) 方法,利用这个方法虽然可以提交任务,但是却没有办法获取任务的执行结果(execute() 方法没有返回值)。

3.2 3个submit()方法

Java 通过 ThreadPoolExecutor 提供的 3 个 submit() 方法和 1 个 FutureTask 工具类来支持获得任务执行结果的需求。下面我们先来介绍这 3 个 submit() 方法,这 3 个方法的方法签名如下。

  1. // 提交 Runnable 任务
  2. Future<?> submit(Runnable task);
  3. // 提交 Callable 任务
  4. <T> Future<T> submit(Callable<T> task);
  5. // 提交 Runnable 任务及结果引用
  6. <T> Future<T> submit(Runnable task, T result);

你会发现它们的返回值都是 Future 接口,Future 接口有 5 个方法,我都列在下面了,它们分别是取消任务的方法 cancel()、判断任务是否已取消的方法 isCancelled()、判断任务是否已结束的方法 isDone()以及2 个获得任务执行结果的 get() 和 get(timeout, unit),其中最后一个 get(timeout, unit) 支持超时机制。通过 Future 接口的这 5 个方法你会发现,我们提交的任务不但能够获取任务执行结果,还可以取消任务。不过需要注意的是:这两个 get() 方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法的线程会阻塞,直到任务执行完才会被唤醒。

  1. // 取消任务
  2. boolean cancel(boolean mayInterruptIfRunning);
  3. // 判断任务是否已取消
  4. boolean isCancelled();
  5. // 判断任务是否已结束
  6. boolean isDone();
  7. // 获得任务执行结果
  8. get();
  9. // 获得任务执行结果,支持超时
  10. get(long timeout, TimeUnit unit);

这 3 个 submit() 方法之间的区别在于方法参数不同,下面我们简要介绍一下。

  1. 提交 Runnable 任务 submit(Runnable task) :这个方法的参数是一个 Runnable 接口,Runnable 接口的 run() 方法是没有返回值的,所以 submit(Runnable task) 这个方法返回的 Future 仅可以用来断言任务已经结束了,类似于 Thread.join()。
  2. 提交 Callable 任务 submit(Callable<T> task):这个方法的参数是一个 Callable 接口,它只有一个 call() 方法,并且这个方法是有返回值的,所以这个方法返回的 Future 对象可以通过调用其 get() 方法来获取任务的执行结果。
  3. 提交 Runnable 任务及结果引用 submit(Runnable task, T result):这个方法很有意思,假设这个方法返回的 Future 对象是 f,f.get() 的返回值就是传给 submit() 方法的参数 result。这个方法该怎么用呢?下面这段示例代码展示了它的经典用法。需要你注意的是 Runnable 接口的实现类 Task 声明了一个有参构造函数 Task(Result r) ,创建 Task 对象的时候传入了 result 对象,这样就能在类 Task 的 run() 方法中对 result 进行各种操作了。result 相当于主线程和子线程之间的桥梁,通过它主子线程可以共享数据。

3.3 FutureTask工具类

下面我们再来介绍 FutureTask 工具类。前面我们提到的 Future 是一个接口,而 FutureTask 是一个实实在在的工具类,这个工具类有两个构造函数,它们的参数和前面介绍的 submit() 方法类似,所以这里我就不再赘述了。

  1. FutureTask(Callable<V> callable);
  2. FutureTask(Runnable runnable, V result);

那如何使用 FutureTask 呢?其实很简单,FutureTask 实现了 Runnable 和 Future 接口,由于实现了 Runnable 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行;又因为实现了 Future 接口,所以也能用来获得任务的执行结果。下面的示例代码是将 FutureTask 对象提交给 ThreadPoolExecutor 去执行。

  1. // 创建 FutureTask
  2. FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);
  3. // 创建线程池
  4. ExecutorService es = Executors.newCachedThreadPool();
  5. // 提交 FutureTask
  6. es.submit(futureTask);
  7. // 获取计算结果
  8. Integer result = futureTask.get();

FutureTask 对象直接被 Thread 执行的示例代码如下所示。相信你已经发现了,利用 FutureTask 对象可以很容易获取子线程的执行结果。

  1. // 创建 FutureTask
  2. FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);
  3. // 创建并启动线程
  4. Thread T1 = new Thread(futureTask);
  5. T1.start();
  6. // 获取计算结果
  7. Integer result = futureTask.get();

image.png

下面的示例代码就是用这一章提到的 Future 特性来实现的。首先,我们创建了两个 FutureTask——ft1 和 ft2,ft1 完成洗水壶、烧开水、泡茶的任务,ft2 完成洗茶壶、洗茶杯、拿茶叶的任务;这里需要注意的是 ft1 这个任务在执行泡茶任务前,需要等待 ft2 把茶叶拿来,所以 ft1 内部需要引用 ft2,并在执行泡茶之前,调用 ft2 的 get() 方法实现等待

  1. // 创建任务 T2 的 FutureTask
  2. FutureTask<String> ft2
  3. = new FutureTask<>(new T2Task());
  4. // 创建任务 T1 的 FutureTask
  5. FutureTask<String> ft1
  6. = new FutureTask<>(new T1Task(ft2));
  7. // 线程 T1 执行任务 ft1
  8. Thread T1 = new Thread(ft1);
  9. T1.start();
  10. // 线程 T2 执行任务 ft2
  11. Thread T2 = new Thread(ft2);
  12. T2.start();
  13. // 等待线程 T1 执行结果
  14. System.out.println(ft1.get());
  15. // T1Task 需要执行的任务:
  16. // 洗水壶、烧开水、泡茶
  17. class T1Task implements Callable<String>{
  18. FutureTask<String> ft2;
  19. // T1 任务需要 T2 任务的 FutureTask
  20. T1Task(FutureTask<String> ft2){
  21. this.ft2 = ft2;
  22. }
  23. @Override
  24. String call() throws Exception {
  25. System.out.println("T1: 洗水壶...");
  26. TimeUnit.SECONDS.sleep(1);
  27. System.out.println("T1: 烧开水...");
  28. TimeUnit.SECONDS.sleep(15);
  29. // 获取 T2 线程的茶叶
  30. String tf = ft2.get();
  31. System.out.println("T1: 拿到茶叶:"+tf);
  32. System.out.println("T1: 泡茶...");
  33. return " 上茶:" + tf;
  34. }
  35. }
  36. // T2Task 需要执行的任务:
  37. // 洗茶壶、洗茶杯、拿茶叶
  38. class T2Task implements Callable<String> {
  39. @Override
  40. String call() throws Exception {
  41. System.out.println("T2: 洗茶壶...");
  42. TimeUnit.SECONDS.sleep(1);
  43. System.out.println("T2: 洗茶杯...");
  44. TimeUnit.SECONDS.sleep(2);
  45. System.out.println("T2: 拿茶叶...");
  46. TimeUnit.SECONDS.sleep(1);
  47. return " 龙井 ";
  48. }
  49. }
  50. // 一次执行结果:
  51. T1: 洗水壶...
  52. T2: 洗茶壶...
  53. T1: 烧开水...
  54. T2: 洗茶杯...
  55. T2: 拿茶叶...
  56. T1: 拿到茶叶: 龙井
  57. T1: 泡茶...
  58. 上茶: 龙井