1. Callable

有两种创建线程的方法-一种是通过创建Thread类,另一种是通过使用Runnable创建线程。但是,Runnable缺少的一项功能是,当线程终止时(即run() 完成时),我们无法使线程返回结果。为了支持此功能,Java中提供了Callable接口。

  • 为了实现Runnable,需要实现不返回任何内容的run() 方法,而对于Callable,需要实现在完成时返回结果的call() 方法。请注意,不能使用Callable创建线程,只能使用Runnable创建线程。
  • 另一个区别是call() 方法可以引发异常,而run() 则不能。
  • 为实现Callable而必须重写call方法。
  1. import lombok.AllArgsConstructor;
  2. import lombok.Data;
  3. import lombok.NoArgsConstructor;
  4. import lombok.SneakyThrows;
  5. import java.text.SimpleDateFormat;
  6. import java.util.ArrayList;
  7. import java.util.Date;
  8. import java.util.List;
  9. import java.util.Random;
  10. import java.util.concurrent.*;
  11. /**
  12. * 启动11个线程, 10个线程计算从0至100的累加和,第11个线程计算10个线程的结果和并输出。
  13. * 请使用Callable和Thread分别实现
  14. */
  15. public class CallableAndThread {
  16. public static void main(String[] args) throws InterruptedException, ExecutionException {
  17. System.out.println(new CallableExample().test());
  18. System.out.println(new ThreadExample().test());
  19. }
  20. }
  21. /**
  22. * CallableExample
  23. */
  24. class CallableExample{
  25. /**
  26. * 使用Executors.newFixedThreadPool创建线程池
  27. * @return
  28. * @throws ExecutionException
  29. * @throws InterruptedException
  30. */
  31. public int test() throws ExecutionException, InterruptedException {
  32. //线程池内线程数量
  33. int TASK_COUNT = 10;
  34. //线程池存储线程
  35. ExecutorService pool = Executors.newFixedThreadPool(TASK_COUNT);
  36. //结果集收集结果
  37. List<Future<Integer>> resultFutureList = new ArrayList<Future<Integer>>();
  38. //执行线程,收集结果
  39. for (int i = 0; i < TASK_COUNT; i++) {
  40. MyCallable myCallable = new MyCallable("线程"+i);
  41. Future<Integer> future = pool.submit(myCallable);
  42. resultFutureList.add(future);
  43. }
  44. //计算总和
  45. Future<Integer> sumFuture =pool.submit(new Callable<Integer>() {
  46. @Override
  47. public Integer call() throws Exception {
  48. int sum = 0;
  49. for (int i = 0; i < TASK_COUNT; sum += resultFutureList.get(i).get(), i++);
  50. return sum;
  51. }
  52. });
  53. //线程池关闭
  54. pool.shutdown();
  55. return sumFuture.get();
  56. }
  57. /**
  58. * 实现Callable接口
  59. */
  60. @Data
  61. @AllArgsConstructor
  62. @NoArgsConstructor
  63. private static class MyCallable implements Callable<Integer>{
  64. //线程名称
  65. private String threadName;
  66. @Override
  67. public Integer call() throws Exception {
  68. Random random = new Random();
  69. int sleepTimes = random.nextInt(10);
  70. System.out.println(threadName+"睡眠"+sleepTimes+"秒..., 当前时间"+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
  71. TimeUnit.SECONDS.sleep(sleepTimes);
  72. Integer sum = 0;
  73. for (int i = 0; i <= 100; sum += i, i++);
  74. System.out.println(threadName+"计算结果"+sum+",完成时间:"+new SimpleDateFormat("HH:mm:ss").format(new Date()));
  75. return sum;
  76. }
  77. }
  78. }
  79. /**
  80. * ThreadExample
  81. */
  82. class ThreadExample{
  83. /**
  84. * 线程直接使用new Thread来创建
  85. * @return
  86. */
  87. public int test() throws InterruptedException {
  88. int THREAD_COUNT = 10;
  89. MyRunnable[] myRunnables = new MyRunnable[THREAD_COUNT];
  90. for (int i = 0; i < THREAD_COUNT; i++) {
  91. myRunnables[i] = new MyRunnable();
  92. new Thread(myRunnables[i],"线程"+i).start();
  93. }
  94. //收集结果
  95. int sum = 0;
  96. for (int i = 0; i < THREAD_COUNT; sum += myRunnables[i].get().intValue(), i++);
  97. return sum;
  98. }
  99. /**
  100. * 实现Runnable
  101. */
  102. @Data
  103. public static class MyRunnable implements Runnable{
  104. private Integer sum = 0;
  105. @SneakyThrows
  106. @Override
  107. public void run() {
  108. Random random = new Random();
  109. int sleepTimes = random.nextInt(10);
  110. System.out.println(Thread.currentThread().getName()+"睡眠"+sleepTimes+"秒..., 当前时间"+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
  111. TimeUnit.SECONDS.sleep(sleepTimes);
  112. for (int i = 0; i <= 100; this.sum += i, i++);
  113. System.out.println(Thread.currentThread().getName()+"计算结果"+sum+",完成时间:"+new SimpleDateFormat("HH:mm:ss").format(new Date()));
  114. //唤醒其他线程
  115. synchronized(this){
  116. notifyAll();
  117. }
  118. }
  119. /**
  120. * 同步方法获取结果值
  121. * @return
  122. * @throws InterruptedException
  123. */
  124. public synchronized Integer get() throws InterruptedException {
  125. while (this.sum == 0){
  126. wait();
  127. }
  128. return this.sum;
  129. }
  130. }
  131. }

Callable 与Thread的区别:

  1. 最大的区别是Callable有返回值,而Thread没有。
  2. Callable必须配合线程池和Future类使用。Thread不需要,它自己可以单独使用。
  3. Callable不能创建线程,但是Thread可以。

说到底: Callable就是为了线程池准备的,而Thread就是为了独立运行使用。

2. CompletableFuture

  1. import lombok.AllArgsConstructor;
  2. import lombok.Data;
  3. import lombok.ToString;
  4. import java.util.ArrayList;
  5. import java.util.Collections;
  6. import java.util.List;
  7. import java.util.Random;
  8. import java.util.concurrent.CompletableFuture;
  9. import java.util.concurrent.TimeUnit;
  10. /**
  11. * 假设你能提供这样一种服务:
  12. * 给你一个渠道列表, 里面包含各种电商的渠道, 你要从这些渠道里面获取某个商品的价格,然后找出最低的价格打印出来。
  13. */
  14. public class CompletableFutureDemo {
  15. //渠道列表
  16. private static List<Channel> channelList = Collections.synchronizedList(new ArrayList<>());
  17. static {
  18. channelList.add(new Channel("淘宝",1.00));
  19. channelList.add(new Channel("天猫",2.00));
  20. channelList.add(new Channel("拼多多",3.00));
  21. channelList.add(new Channel("美团",4.00));
  22. }
  23. /**
  24. * 随机睡眠一段时间,模拟网络请求爬取数据
  25. */
  26. private static void delay() throws InterruptedException {
  27. int delayTime = new Random().nextInt(5000);
  28. TimeUnit.MILLISECONDS.sleep(delayTime);
  29. System.out.println("线程睡眠了"+delayTime+"毫秒");
  30. }
  31. public static void main(String[] args) {
  32. long start = System.currentTimeMillis();
  33. CompletableFuture<Double>[] completableFutures = new CompletableFuture[channelList.size()];
  34. for (int i = 0; i < channelList.size(); i++) {
  35. int j = i;
  36. completableFutures[i] = CompletableFuture.supplyAsync(() -> {
  37. try {
  38. delay();
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. }
  42. return channelList.get(j).getPrice();
  43. });
  44. }
  45. CompletableFuture.allOf(completableFutures).join();
  46. long end = System.currentTimeMillis();
  47. System.out.println("耗时"+(end-start)+"毫秒");
  48. }
  49. /**
  50. * 渠道类
  51. */
  52. @Data
  53. @AllArgsConstructor
  54. @ToString
  55. private static class Channel{
  56. private String name; //渠道名字
  57. private Double price; //商品在该渠道内的价格
  58. }
  59. }

3. ThreadPoolExecutor

3.1 ThreadPoolExecutor 继承结构

image.png

3.2 ThreadPoolExecutor 的7个构造参数

  1. /**
  2. * Creates a new {@code ThreadPoolExecutor} with the given initial
  3. * parameters.
  4. *
  5. * @param corePoolSize the number of threads to keep in the pool, even
  6. * if they are idle, unless {@code allowCoreThreadTimeOut} is set
  7. * 核心线程数量:
  8. * @param maximumPoolSize the maximum number of threads to allow in the
  9. * pool
  10. * 最大线程数量:
  11. * @param keepAliveTime when the number of threads is greater than
  12. * the core, this is the maximum time that excess idle threads
  13. * will wait for new tasks before terminating.
  14. * 超时时间:线程池内不是核心线程的线程运行完毕后的等待时间, 超出后自行销毁。
  15. * @param unit the time unit for the {@code keepAliveTime} argument
  16. * 超时时间单位:
  17. * @param workQueue the queue to use for holding tasks before they are
  18. * executed. This queue will hold only the {@code Runnable}
  19. * tasks submitted by the {@code execute} method.
  20. * 工作队列: 必须是阻塞队列
  21. * @param threadFactory the factory to use when the executor
  22. * creates a new thread
  23. * 线程工厂:JDK自带默认Executors.defaultThreadFactory()
  24. * @param handler the handler to use when execution is blocked
  25. * because the thread bounds and queue capacities are reached
  26. * 拒绝策略:JDK自带4个拒绝策略,默认为Abort抛异常策略
  27. * @throws IllegalArgumentException if one of the following holds:<br>
  28. * {@code corePoolSize < 0}<br>
  29. * {@code keepAliveTime < 0}<br>
  30. * {@code maximumPoolSize <= 0}<br>
  31. * {@code maximumPoolSize < corePoolSize}
  32. * @throws NullPointerException if {@code workQueue}
  33. * or {@code threadFactory} or {@code handler} is null
  34. */
  35. public ThreadPoolExecutor(int corePoolSize,
  36. int maximumPoolSize,
  37. long keepAliveTime,
  38. TimeUnit unit,
  39. BlockingQueue<Runnable> workQueue,
  40. ThreadFactory threadFactory,
  41. RejectedExecutionHandler handler)

3.3 ThreadPoolExecutor 模型

image.png

3.4 ThreadPoolExecutor 的4个特性

  1. import java.util.concurrent.ArrayBlockingQueue;
  2. import java.util.concurrent.ThreadPoolExecutor;
  3. import java.util.concurrent.TimeUnit;
  4. public class ThreadPoolExecutorDemo {
  5. /**
  6. * 特性1: 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队
  7. */
  8. private static void test1() throws InterruptedException {
  9. //核心线程2个, 最大线程3个, 超时60秒,工作队列容量1,
  10. ThreadPoolExecutor pool = new ThreadPoolExecutor(2,3,60L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1));
  11. pool.execute(()->run("任务1",0,TimeUnit.SECONDS)); //执行任务1
  12. TimeUnit.SECONDS.sleep(2); //主线程睡眠2秒,保证线程1先执行完
  13. pool.execute(()->run("任务2",0,TimeUnit.SECONDS)); //执行任务2
  14. pool.shutdown(); //关闭线程池
  15. System.out.println("特性1: 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队");
  16. }
  17. /**
  18. * 特性2: 当池中正在运行的线程数大于等于corePoolSize时,新插入的任务进入workQueue排队(如果workQueue长度允许),等待空闲线程来执行。
  19. */
  20. private static void test2() throws InterruptedException {
  21. //核心线程2个, 最大线程3个, 超时60秒,工作队列容量1,
  22. ThreadPoolExecutor pool = new ThreadPoolExecutor(2,3,60L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1));
  23. pool.execute(()->run("任务1",2,TimeUnit.SECONDS)); //执行任务1
  24. pool.execute(()->run("任务2",2,TimeUnit.SECONDS)); //执行任务2
  25. pool.execute(()->run("任务3",0,TimeUnit.SECONDS)); //执行任务3
  26. pool.shutdown();//关闭线程池
  27. //从实验结果上看,任务3会等待任务2执行完之后,有了空闲线程,才会执行。并没有新建线程执行任务3,这时maximumPoolSize=3这个参数不起作用。
  28. System.out.println("特性2: 当池中正在运行的线程数大于等于corePoolSize时,新插入的任务进入workQueue排队(如果workQueue长度允许),等待空闲线程来执行。");
  29. }
  30. /**
  31. * 特性3: 当队列里的任务数达到上限,并且池中正在运行的线程数小于maximumPoolSize,对于新加入的任务,新建线程。
  32. */
  33. private static void test3() throws InterruptedException {
  34. //核心线程2个, 最大线程3个, 超时60秒,工作队列容量2
  35. ThreadPoolExecutor pool = new ThreadPoolExecutor(2,3,60L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(2));
  36. pool.execute(()->run("任务1",5,TimeUnit.SECONDS)); //执行任务1
  37. pool.execute(()->run("任务2",5,TimeUnit.SECONDS)); //执行任务2
  38. pool.execute(()->run("任务3",0,TimeUnit.SECONDS)); //执行任务3
  39. pool.execute(()->run("任务4",0,TimeUnit.SECONDS)); //执行任务4
  40. pool.execute(()->run("任务5",0,TimeUnit.SECONDS)); //执行任务5
  41. pool.shutdown();//关闭线程池
  42. //当任务4进入队列时发现队列的长度已经到了上限,所以无法进入队列排队,而此时正在运行的线程数(2)小于maximumPoolSize所以新建线程执行该任务。
  43. //创建的新线程消费了队列里面所有任务
  44. System.out.println("特性3: 当队列里的任务数达到上限,并且池中正在运行的线程数小于maximumPoolSize,对于新加入的任务,新建线程。");
  45. }
  46. /**
  47. * 特性4:当队列里的任务数达到上限,并且池中正在运行的线程数等于maximumPoolSize,对于新加入的任务,执行拒绝策略(线程池默认的拒绝策略是抛异常)。
  48. */
  49. private static void test4() throws InterruptedException {
  50. //核心线程2个, 最大线程3个, 超时60秒,工作队列容量1, 调用者执行策略。
  51. ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 3, 60L, TimeUnit.SECONDS,
  52. new ArrayBlockingQueue<>(1),
  53. Executors.defaultThreadFactory(),
  54. new ThreadPoolExecutor.CallerRunsPolicy()); //调用者执行策略,溢出的任务由调用者(这里是main)执行。
  55. pool.execute(()->run("任务1",5,TimeUnit.SECONDS)); //执行任务1
  56. pool.execute(()->run("任务2",5,TimeUnit.SECONDS)); //执行任务2
  57. pool.execute(()->run("任务3",5,TimeUnit.SECONDS)); //执行任务3
  58. pool.execute(()->run("任务4",5,TimeUnit.SECONDS)); //执行任务4
  59. pool.execute(()->run("任务5",5,TimeUnit.SECONDS)); //执行任务5
  60. pool.shutdown();
  61. //当任务5加入时,队列达到上限,池内运行的线程数达到最大,故执行默认的拒绝策略,抛异常。
  62. System.out.println("特性4:当队列里的任务数达到上限,并且池中正在运行的线程数等于maximumPoolSize,对于新加入的任务,执行拒绝策略(线程池默认的拒绝策略是抛异常)。");
  63. }
  64. /**
  65. * 执行的任务
  66. * @param taskName 任务名称
  67. * @param sleepTime 睡眠时间
  68. * @param timeUnit 时间单位
  69. */
  70. private static void run(String taskName,long sleepTime,TimeUnit timeUnit){
  71. try {
  72. timeUnit.sleep(sleepTime);
  73. } catch (InterruptedException e) {
  74. e.printStackTrace();
  75. }
  76. System.out.println(taskName+",执行线程"+Thread.currentThread().getName());
  77. }
  78. public static void main(String[] args) throws InterruptedException {
  79. // System.out.println("测试特性1");
  80. // test1();
  81. // System.out.println();
  82. // System.out.println("测试特性2");
  83. // test2();
  84. // System.out.println("测试特性3");
  85. // test3();
  86. // System.out.println("测试特性4");
  87. // test4();
  88. }
  89. }

结果:

测试特性1 线程pool-1-thread-1执行任务1 线程pool-1-thread-2执行任务2 特性1: 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队

测试特性2 特性2: 当池中正在运行的线程数大于等于corePoolSize时,新插入的任务进入workQueue排队(如果workQueue长度允许),等待空闲线程来执行。 线程pool-1-thread-2执行任务2 线程pool-1-thread-1执行任务1 线程pool-1-thread-2执行任务3

测试特性3 特性3: 当队列里的任务数达到上限,并且池中正在运行的线程数小于maximumPoolSize,对于新加入的任务,新建线程。 线程pool-1-thread-3执行任务5 线程pool-1-thread-3执行任务3 线程pool-1-thread-3执行任务4 线程pool-1-thread-1执行任务1 线程pool-1-thread-2执行任务2

测试特性4 线程pool-1-thread-2执行任务2 线程pool-1-thread-1执行任务1 线程pool-1-thread-3执行任务4 线程main执行任务5 特性4:当队列里的任务数达到上限,并且池中正在运行的线程数等于maximumPoolSize,对于新加入的任务,执行拒绝策略(线程池默认的拒绝策略是抛异常)。 线程pool-1-thread-2执行任务3

  1. 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队
  2. 当池中正在运行的线程数大于等于corePoolSize时,新插入的任务进入workQueue排队(如果workQueue长度允许),等待空闲线程来执行。
  3. 当队列里的任务数达到上限,并且池中正在运行的线程数小于maximumPoolSize,对于新加入的任务,新建线程。
  4. 当队列里的任务数达到上限,并且池中正在运行的线程数小于maximumPoolSize,对于新加入的任务,新建线程。

由此可见:任务的分配顺序是:核心线程->队列->最大线程->拒绝策略。

3.5 ThreadPoolExecutor 拒绝策略

JDK默认提供4种拒绝策略:

  1. new ThreadPoolExecutor.AbortPolicy(); //抛异常
  2. new ThreadPoolExecutor.CallerRunsPolicy(); //调用者处理任务
  3. new ThreadPoolExecutor.DiscardPolicy(); //直接扔掉
  4. new ThreadPoolExecutor.DiscardOldestPolicy(); //扔掉最老任务

自定义拒绝策略:
实现RejectedExecutionHandler接口

  1. /**
  2. * 自定义拒绝策略
  3. */
  4. public class MyRejectedExecutionHandler implements RejectedExecutionHandler {
  5. @Override
  6. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  7. System.out.println("任务保存至kafka");
  8. }
  9. }

3.6 ThreadPoolExecutor 源码分析

1、常用变量的解释

  1. // 1. `ctl`,可以看做一个int类型的数字,高3位表示线程池状态,低29位表示worker数量
  2. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  3. // 2. `COUNT_BITS`,`Integer.SIZE`为32,所以`COUNT_BITS`为29
  4. private static final int COUNT_BITS = Integer.SIZE - 3;
  5. // 3. `CAPACITY`,线程池允许的最大线程数。1左移29位,然后减1,即为 2^29 - 1
  6. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  7. // runState is stored in the high-order bits
  8. // 4. 线程池有5种状态,按大小排序如下:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
  9. private static final int RUNNING = -1 << COUNT_BITS;
  10. private static final int SHUTDOWN = 0 << COUNT_BITS;
  11. private static final int STOP = 1 << COUNT_BITS;
  12. private static final int TIDYING = 2 << COUNT_BITS;
  13. private static final int TERMINATED = 3 << COUNT_BITS;
  14. // Packing and unpacking ctl
  15. // 5. `runStateOf()`,获取线程池状态,通过按位与操作,低29位将全部变成0
  16. private static int runStateOf(int c) { return c & ~CAPACITY; }
  17. // 6. `workerCountOf()`,获取线程池worker数量,通过按位与操作,高3位将全部变成0
  18. private static int workerCountOf(int c) { return c & CAPACITY; }
  19. // 7. `ctlOf()`,根据线程池状态和线程池worker数量,生成ctl值
  20. private static int ctlOf(int rs, int wc) { return rs | wc; }
  21. /*
  22. * Bit field accessors that don't require unpacking ctl.
  23. * These depend on the bit layout and on workerCount being never negative.
  24. */
  25. // 8. `runStateLessThan()`,线程池状态小于xx
  26. private static boolean runStateLessThan(int c, int s) {
  27. return c < s;
  28. }
  29. // 9. `runStateAtLeast()`,线程池状态大于等于xx
  30. private static boolean runStateAtLeast(int c, int s) {
  31. return c >= s;
  32. }

2、构造方法

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. // 基本类型参数校验
  9. if (corePoolSize < 0 ||
  10. maximumPoolSize <= 0 ||
  11. maximumPoolSize < corePoolSize ||
  12. keepAliveTime < 0)
  13. throw new IllegalArgumentException();
  14. // 空指针校验
  15. if (workQueue == null || threadFactory == null || handler == null)
  16. throw new NullPointerException();
  17. this.corePoolSize = corePoolSize;
  18. this.maximumPoolSize = maximumPoolSize;
  19. this.workQueue = workQueue;
  20. // 根据传入参数`unit`和`keepAliveTime`,将存活时间转换为纳秒存到变量`keepAliveTime `中
  21. this.keepAliveTime = unit.toNanos(keepAliveTime);
  22. this.threadFactory = threadFactory;
  23. this.handler = handler;
  24. }

3、提交执行task的过程

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. /*
  5. * Proceed in 3 steps:
  6. *
  7. * 1. If fewer than corePoolSize threads are running, try to
  8. * start a new thread with the given command as its first
  9. * task. The call to addWorker atomically checks runState and
  10. * workerCount, and so prevents false alarms that would add
  11. * threads when it shouldn't, by returning false.
  12. *
  13. * 2. If a task can be successfully queued, then we still need
  14. * to double-check whether we should have added a thread
  15. * (because existing ones died since last checking) or that
  16. * the pool shut down since entry into this method. So we
  17. * recheck state and if necessary roll back the enqueuing if
  18. * stopped, or start a new thread if there are none.
  19. *
  20. * 3. If we cannot queue task, then we try to add a new
  21. * thread. If it fails, we know we are shut down or saturated
  22. * and so reject the task.
  23. */
  24. int c = ctl.get();
  25. // worker数量比核心线程数小,直接创建worker执行任务
  26. if (workerCountOf(c) < corePoolSize) {
  27. if (addWorker(command, true))
  28. return;
  29. c = ctl.get();
  30. }
  31. // worker数量超过核心线程数,任务直接进入队列
  32. if (isRunning(c) && workQueue.offer(command)) {
  33. int recheck = ctl.get();
  34. // 线程池状态不是RUNNING状态,说明执行过shutdown命令,需要对新加入的任务执行reject()操作。
  35. // 这儿为什么需要recheck,是因为任务入队列前后,线程池的状态可能会发生变化。
  36. if (! isRunning(recheck) && remove(command))
  37. reject(command);
  38. // 这儿为什么需要判断0值,主要是在线程池构造方法中,核心线程数允许为0
  39. else if (workerCountOf(recheck) == 0)
  40. addWorker(null, false);
  41. }
  42. // 如果线程池不是运行状态,或者任务进入队列失败,则尝试创建worker执行任务。
  43. // 这儿有3点需要注意:
  44. // 1. 线程池不是运行状态时,addWorker内部会判断线程池状态
  45. // 2. addWorker第2个参数表示是否创建核心线程
  46. // 3. addWorker返回false,则说明任务执行失败,需要执行reject操作
  47. else if (!addWorker(command, false))
  48. reject(command);
  49. }

4、addworker方法源码解析

  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. retry:
  3. // 外层自旋
  4. for (;;) {
  5. int c = ctl.get();
  6. int rs = runStateOf(c);
  7. // 这个条件写得比较难懂,我对其进行了调整,和下面的条件等价
  8. // (rs > SHUTDOWN) ||
  9. // (rs == SHUTDOWN && firstTask != null) ||
  10. // (rs == SHUTDOWN && workQueue.isEmpty())
  11. // 1. 线程池状态大于SHUTDOWN时,直接返回false
  12. // 2. 线程池状态等于SHUTDOWN,且firstTask不为null,直接返回false
  13. // 3. 线程池状态等于SHUTDOWN,且队列为空,直接返回false
  14. // Check if queue empty only if necessary.
  15. if (rs >= SHUTDOWN &&
  16. ! (rs == SHUTDOWN &&
  17. firstTask == null &&
  18. ! workQueue.isEmpty()))
  19. return false;
  20. // 内层自旋
  21. for (;;) {
  22. int wc = workerCountOf(c);
  23. // worker数量超过容量,直接返回false
  24. if (wc >= CAPACITY ||
  25. wc >= (core ? corePoolSize : maximumPoolSize))
  26. return false;
  27. // 使用CAS的方式增加worker数量。
  28. // 若增加成功,则直接跳出外层循环进入到第二部分
  29. if (compareAndIncrementWorkerCount(c))
  30. break retry;
  31. c = ctl.get(); // Re-read ctl
  32. // 线程池状态发生变化,对外层循环进行自旋
  33. if (runStateOf(c) != rs)
  34. continue retry;
  35. // 其他情况,直接内层循环进行自旋即可
  36. // else CAS failed due to workerCount change; retry inner loop
  37. }
  38. }
  39. boolean workerStarted = false;
  40. boolean workerAdded = false;
  41. Worker w = null;
  42. try {
  43. w = new Worker(firstTask);
  44. final Thread t = w.thread;
  45. if (t != null) {
  46. final ReentrantLock mainLock = this.mainLock;
  47. // worker的添加必须是串行的,因此需要加锁
  48. mainLock.lock();
  49. try {
  50. // Recheck while holding lock.
  51. // Back out on ThreadFactory failure or if
  52. // shut down before lock acquired.
  53. // 这儿需要重新检查线程池状态
  54. int rs = runStateOf(ctl.get());
  55. if (rs < SHUTDOWN ||
  56. (rs == SHUTDOWN && firstTask == null)) {
  57. // worker已经调用过了start()方法,则不再创建worker
  58. if (t.isAlive()) // precheck that t is startable
  59. throw new IllegalThreadStateException();
  60. // worker创建并添加到workers成功
  61. workers.add(w);
  62. // 更新`largestPoolSize`变量
  63. int s = workers.size();
  64. if (s > largestPoolSize)
  65. largestPoolSize = s;
  66. workerAdded = true;
  67. }
  68. } finally {
  69. mainLock.unlock();
  70. }
  71. // 启动worker线程
  72. if (workerAdded) {
  73. t.start();
  74. workerStarted = true;
  75. }
  76. }
  77. } finally {
  78. // worker线程启动失败,说明线程池状态发生了变化(关闭操作被执行),需要进行shutdown相关操作
  79. if (! workerStarted)
  80. addWorkerFailed(w);
  81. }
  82. return workerStarted;
  83. }

5、线程池worker任务单元

  1. private final class Worker
  2. extends AbstractQueuedSynchronizer
  3. implements Runnable
  4. {
  5. /**
  6. * This class will never be serialized, but we provide a
  7. * serialVersionUID to suppress a javac warning.
  8. */
  9. private static final long serialVersionUID = 6138294804551838833L;
  10. /** Thread this worker is running in. Null if factory fails. */
  11. final Thread thread;
  12. /** Initial task to run. Possibly null. */
  13. Runnable firstTask;
  14. /** Per-thread task counter */
  15. volatile long completedTasks;
  16. /**
  17. * Creates with given first task and thread from ThreadFactory.
  18. * @param firstTask the first task (null if none)
  19. */
  20. Worker(Runnable firstTask) {
  21. setState(-1); // inhibit interrupts until runWorker
  22. this.firstTask = firstTask;
  23. // 这儿是Worker的关键所在,使用了线程工厂创建了一个线程。传入的参数为当前worker
  24. this.thread = getThreadFactory().newThread(this);
  25. }
  26. /** Delegates main run loop to outer runWorker */
  27. public void run() {
  28. runWorker(this);
  29. }
  30. // 省略代码...
  31. }

6、核心线程执行逻辑-runworker

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. Runnable task = w.firstTask;
  4. w.firstTask = null;
  5. // 调用unlock()是为了让外部可以中断
  6. w.unlock(); // allow interrupts
  7. // 这个变量用于判断是否进入过自旋(while循环)
  8. boolean completedAbruptly = true;
  9. try {
  10. // 这儿是自旋
  11. // 1. 如果firstTask不为null,则执行firstTask;
  12. // 2. 如果firstTask为null,则调用getTask()从队列获取任务。
  13. // 3. 阻塞队列的特性就是:当队列为空时,当前线程会被阻塞等待
  14. while (task != null || (task = getTask()) != null) {
  15. // 这儿对worker进行加锁,是为了达到下面的目的
  16. // 1. 降低锁范围,提升性能
  17. // 2. 保证每个worker执行的任务是串行的
  18. w.lock();
  19. // If pool is stopping, ensure thread is interrupted;
  20. // if not, ensure thread is not interrupted. This
  21. // requires a recheck in second case to deal with
  22. // shutdownNow race while clearing interrupt
  23. // 如果线程池正在停止,则对当前线程进行中断操作
  24. if ((runStateAtLeast(ctl.get(), STOP) ||
  25. (Thread.interrupted() &&
  26. runStateAtLeast(ctl.get(), STOP))) &&
  27. !wt.isInterrupted())
  28. wt.interrupt();
  29. // 执行任务,且在执行前后通过`beforeExecute()`和`afterExecute()`来扩展其功能。
  30. // 这两个方法在当前类里面为空实现。
  31. try {
  32. beforeExecute(wt, task);
  33. Throwable thrown = null;
  34. try {
  35. task.run();
  36. } catch (RuntimeException x) {
  37. thrown = x; throw x;
  38. } catch (Error x) {
  39. thrown = x; throw x;
  40. } catch (Throwable x) {
  41. thrown = x; throw new Error(x);
  42. } finally {
  43. afterExecute(task, thrown);
  44. }
  45. } finally {
  46. // 帮助gc
  47. task = null;
  48. // 已完成任务数加一
  49. w.completedTasks++;
  50. w.unlock();
  51. }
  52. }
  53. completedAbruptly = false;
  54. } finally {
  55. // 自旋操作被退出,说明线程池正在结束
  56. processWorkerExit(w, completedAbruptly);
  57. }
  58. }

4. JDK自带的线程池

4.1 单例线程池

  1. ExecutorService service = Executors.newSingleThreadExecutor();

单例线程池是一个只有1个线程的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行。
image.png

4.2 缓存线程池

  1. ExecutorService service = Executors.newCachedThreadPool();

缓存线程池是一个可缓存线程的线程池,如果线程池长度超过处理需求,可以灵活回收空闲线程,若无可回收则新建线程。
image.png

4.3 定长线程池

  1. final int cpuCoreNum = 4;
  2. ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);

定长线程池可以控制线程最大并发数,超过的线程会在队列中等待。
image.png

4.4 定时计划线程池

  1. ScheduledExecutorService service = Executors.newScheduledThreadPool(4);

定时计划线程池支持定时及周期性任务执行。
image.png

5. ForkJoinPool

ForkJoinPool 适合于把一个大任务切分成很多很多的小任务来执行, 执行后汇总结果。

  • 分解汇总的任务
  • 用很少的线程可以执行很多的任务(子任务)TPE做不到先执行子任务
  • 适用于CPU密集型场景

5.1 WorkStealingPool

WorkStealingPoolThreadPoolExecutor不同的地方在于WorkStealingPool是每一个线程都维护了一个队列,如果一个线程执行完自己的队列的任务后,会从其他线程的尾部”一个任务过来执行。所以这个线程池叫做WorkStealingPool

  1. ExecutorService service = Executors.newWorkStealingPool();

6. 定义多少线程合适

9. 线程池 - 图7

  • Ncpu: 是CPU的数量,可以通过 _Runtime.getRuntime().availableProcessors();_ 获取
  • Ucpu:是期望CPU的使用率,比如期望CPU满载则 Ucpu = 1
  • W/C:CPU的等待时间/执行时间(Wait/Computation)