Java Thread
创建线程的方式很简单,new Thread(() -> {...}),就是因为这么简单粗暴的方式,才带来了致命的问题。首先线程的创建和销毁都是很耗时很浪费性能的操作,用线程为了什么?为了就是异步,为了就是提升性能。简单的new三五个Thread还好,但是需要一千个线程呢?也用for循环new1000个Thread吗?用完在销毁掉。那这一千个线程的创建和销毁的性能是很糟糕的!

1、概念

线程池的核心思想就是:线程复用。也就是说线程用完后不销毁,放到池子里等着新任务的到来,反复利用N个线程来执行所有新老任务。这带来的开销只会是那N个线程的创建,而不是每来一个请求都带来一个线程的从生到死的过程。

2、参数

2.1、源码

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {}

2.2、解释

  • corePoolSize:核心线程数

    线程池在完成初始化之后,默认情况下,线程池中不会有任何线程,线程池会等有任务来的时候再去创建线程。核心线程创建出来后即使超出了线程保持的存活时间配置也不会销毁,核心线程只要创建就永驻了,就等着新任务进来进行处理。

  • maximumPoolSize:最大线程数

    核心线程忙不过来且任务存储队列满了的情况下,还有新任务进来的话就会继续开辟线程,但是也不是任意的开辟线程数量,线程数(包含核心线程)达到maximumPoolSize后就不会产生新线程了,就会执行拒绝策略。

  • keepAliveTime:线程保持的存活时间

    如果线程池当前的线程数多于corePoolSize,那么如果多余的线程空闲时间超过keepAliveTime,那么这些多余的线程(超出核心线程数的那些线程)就会被回收。

  • unit:线程保持的存活时间单位

    比如:TimeUnit.MILLISECONDS、TimeUnit.SECONDS

  • workQueue:任务存储队列

    核心线程数满了后还有任务继续提交到线程池的话,就先进入workQueue。 workQueue通常情况下有如下选择: LinkedBlockingQueue:无界队列,意味着无限制,其实是有限制,大小是int的最大值。也可以自定义大小。 ArrayBlockingQueue:有界队列,可以自定义大小,到了阈值就开启新线程(不会超过maximumPoolSize)。 SynchronousQueue:Executors.newCachedThreadPool();默认使用的队列。也不算是个队列,他不没有存储元素的能力。 一般都采取LinkedBlockingQueue,因为他也可以设置大小,可以取代ArrayBlockingQueue有界队列。

  • threadFactory:当线程池需要新的线程时,会用threadFactory来生成新的线程

    默认采用的是DefaultThreadFactory,主要负责创建线程。newThread()方法。创建出来的线程都在同一个线程组且优先级也是一样的。

  • handler:拒绝策略,任务量超出线程池的配置限制或执行shutdown还在继续提交任务的话,会执行handler的逻辑。

    默认采用的是AbortPolicy,遇到上面的情况,线程池将直接采取直接拒绝策略,也就是直接抛出异常。RejectedExecutionException 线程池提供了4种策略,可以实现RejectedExecutionHandler接口来自定义策略

    | 类 | 策略 | | :—- | :—-: | | AbortPolicy | 丢弃任务,抛运行时异常(默认的处理策略) | | CallerRunsPolicy | 执行任务 | | DiscardPolicy | 忽视,什么都不会发生 | | DiscardOldestPolicy | 丢弃队列里最近的一个任务,并执行当前任务 |

3、原理

3.1、原理

  • 线程池刚启动的时候核心线程数为0
  • 丢任务给线程池的时候,线程池会新开启线程来执行这个任务
  • 如果线程数小于corePoolSize,即使工作线程处于空闲状态,也会创建一个新线程来执行新任务
  • 如果线程数大于或等于corePoolSize,则会将任务放到workQueue,也就是任务队列
  • 如果任务队列满了,且线程数小于maximumPoolSize,则会创建一个新线程来运行任务
  • 如果任务队列满了,且线程数大于或等于maximumPoolSize,则直接采取拒绝策略

    3.2、图解

    image.png

    3.3、举例

    线程池参数配置:核心线程5个,最大线程数10个,队列长度为100。
    那么线程池启动的时候不会创建任何线程,假设请求进来6个,则会创建5个核心线程来处理五个请求,另一个没被处理到的进入到队列。这时候有进来99个请求,线程池发现核心线程满了,队列还在空着99个位置,所以会进入到队列里99个,加上刚才的1个正好100个。这时候再次进来5个请求,线程池会再次开辟五个非核心线程来处理这五个请求。目前的情况是线程池里线程数是10个RUNNING状态的,队列里100个也满了。如果这时候又进来1个请求,则直接走拒绝策略。

    3.4、源码

    1. public void execute(Runnable command) {
    2. int c = ctl.get();
    3. // workerCountOf(c):工作线程数
    4. // worker数量比核心线程数小,直接创建worker执行任务
    5. if (workerCountOf(c) < corePoolSize) {
    6. // addWorker里面负责创建线程且执行任务
    7. if (addWorker(command, true))
    8. return;
    9. c = ctl.get();
    10. }
    11. // worker数量超过核心线程数,任务直接进入队列
    12. if (isRunning(c) && workQueue.offer(command)) {
    13. int recheck = ctl.get();
    14. // 线程池状态不是RUNNING状态,说明执行过shutdown命令,需要对新加入的任务执行reject()操作。
    15. // 这儿为什么需要recheck,是因为任务入队列前后,线程池的状态可能会发生变化。
    16. if (! isRunning(recheck) && remove(command))
    17. reject(command);
    18. // 这儿为什么需要判断0值,主要是在线程池构造方法中,核心线程数允许为0
    19. else if (workerCountOf(recheck) == 0)
    20. addWorker(null, false);
    21. }
    22. // 如果线程池不是运行状态,或者任务进入队列失败,则尝试创建worker执行任务。
    23. // 这儿有3点需要注意:
    24. // 1. 线程池不是运行状态时,addWorker内部会判断线程池状态
    25. // 2. addWorker第2个参数表示是否创建核心线程
    26. // 3. addWorker返回false,则说明任务执行失败,需要执行reject操作
    27. else if (!addWorker(command, false))
    28. reject(command);
    29. }

    4、Executors

    4.1、概念

    首先这不是一个线程池,这是线程池的工具类,他能方便创建线程。但是阿里巴巴开发手册上说明不推荐用Executors创建线程池,推荐自定义线程池。这是因为Executors创建的任何一种线程池都可能引发不可预知的问题。

    4.2、固定线程数

    4.2.1、描述

    核心线程数和最大线程数是一样的,所以称之为固定线程数。
    其他参数配置默认为:永不超时(0ms),无界队列(LinkedBlockingQueue)、默认线程工厂(DefaultThreadFactory)、直接拒绝策略(AbortPolicy)。

    4.2.2、api

    Executors.newFixedThreadPool(n);

    4.2.3、demo

    1. import java.util.concurrent.ExecutorService;
    2. import java.util.concurrent.Executors;
    3. /**
    4. * Description: 创建2个线程来执行10个任务。
    5. *
    6. * @author TongWei.Chen 2020-07-09 21:28:34
    7. */
    8. public class ThreadPoolTest {
    9. public static void main(String[] args) {
    10. ExecutorService executorService = Executors.newFixedThreadPool(2);
    11. for (int i = 0; i < 10; i++) {
    12. // 从结果中可以发现线程name永远都是两个。不会有第三个。
    13. executorService.execute(() -> System.out.println(Thread.currentThread().getName()));
    14. }
    15. }
    16. }

    4.2.4、问题

    问题就在于它是无界队列,队列里能放int的最大值个任务,并发巨高的情况下极大可能直接OOM了然后任务还在堆积,毕竟直接用的是jvm内存。所以建议自定义线程池,自己按照需求指定合适的队列大小,自定义拒绝策略将超出队列大小的任务放到对外内存做补偿,比如Redis。别把业务系统压垮就行。

    4.2.5、源码

    1. public static ExecutorService newFixedThreadPool(int nThreads) {
    2. return new ThreadPoolExecutor(
    3. // 核心线程数和最大线程数都是nThreads
    4. nThreads, nThreads,
    5. 0L, TimeUnit.MILLISECONDS,
    6. // 无界队列!!!致命问题的关键所在。
    7. new LinkedBlockingQueue<Runnable>());
    8. }

    4.3、单个线程

    4.3.1、描述

    核心线程数和最大线程数是1,内部默认的,不可更改,所以称之为单线程数的线程池。
    类似于Executors.newFixedThreadPool(1);
    其他参数配置默认为:永不超时(0ms),无界队列(LinkedBlockingQueue)、默认线程工厂(DefaultThreadFactory)、直接拒绝策略(AbortPolicy)。

    4.3.2、api

    Executors.newSingleThreadExecutor();

    4.3.3、demo

    1. import java.util.concurrent.ExecutorService;
    2. import java.util.concurrent.Executors;
    3. /**
    4. * Description: 创建1个线程来执行10个任务。
    5. *
    6. * @author TongWei.Chen 2020-07-09 21:28:34
    7. */
    8. public class ThreadPoolTest {
    9. public static void main(String[] args) {
    10. ExecutorService executorService = Executors.newSingleThreadExecutor();
    11. for (int i = 0; i < 10; i++) {
    12. // 从结果中可以发现线程name永远都是pool-1-thread-1。不会有第二个出现。
    13. executorService.execute(() -> System.out.println(Thread.currentThread().getName()));
    14. }
    15. }
    16. }

    4.3.4、问题

    同【4.2、固定线程数】的问题,都是无界队列的问题。

    4.3.5、源码

    1. public static ExecutorService newSingleThreadExecutor() {
    2. return new FinalizableDelegatedExecutorService
    3. (new ThreadPoolExecutor(
    4. // 核心线程数和最大线程数都是1,写死的,客户端不可更改。
    5. 1, 1,
    6. 0L, TimeUnit.MILLISECONDS,
    7. // 无界队列!!!问题的关键所在。
    8. new LinkedBlockingQueue<Runnable>()));
    9. }

    4.4、带缓存的线程池

    4.4.1、描述

    他的功能是来个任务就开辟个线程去处理,不会进入队列,SynchronousQueue队列也不带存储元素的功能。那这意味着来一亿个请求就会开辟一亿个线程去处理,keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死;这就叫带缓存功能的线程池。
    核心线程数是0,最大线程数是int的最大值,内部默认的,不可更改。
    其他参数配置默认为:1min超时(60s),SynchronousQueue队列、默认线程工厂(DefaultThreadFactory)、直接拒绝策略(AbortPolicy)。

    4.4.2、api

    Executors.newCachedThreadPool();

    4.4.3、demo

    1. import java.util.concurrent.ExecutorService;
    2. import java.util.concurrent.Executors;
    3. /**
    4. * Description: 创建个带缓存功能的线程池来执行10个任务。
    5. *
    6. * @author TongWei.Chen 2020-07-09 21:28:34
    7. */
    8. public class ThreadPoolTest {
    9. public static void main(String[] args) {
    10. ExecutorService executorService = Executors.newCachedThreadPool();
    11. for (int i = 0; i < 10; i++) {
    12. // 从结果中可以发现线程name有10个。也就是有几个任务就会开辟几个线程。
    13. executorService.execute(() -> System.out.println(Thread.currentThread().getName()));
    14. }
    15. }
    16. }

    4.4.4、问题

    问题就在于他的最大线程数是int的最大值,因为他内部采取的队列是SynchronousQueue,这个队列没有容纳元素的能力,这将意味着只要来请求就开启线程去工作,巅峰期能创建二十几亿个线程出来工作,可以想象这是多么可怕!!!

    4.4.5、源码

    1. public static ExecutorService newCachedThreadPool() {
    2. return new ThreadPoolExecutor(
    3. // 核心线程数是0,最大线程数都是Integer.MAX_VALUE,这个可致命了!!!
    4. 0, Integer.MAX_VALUE,
    5. 60L, TimeUnit.SECONDS,
    6. new SynchronousQueue<Runnable>());
    7. }

    4.5、有调度功能的线程池

    4.5.1、描述

    RocketMQ内部大量采用了此种线程池来做心跳等任务。

核心线程数手动传进来,最大线程数是Integer.MAX_VALUE,最大线程数是内部默认的,不可更改。
其他参数配置默认为:永不超时(0ns),带延迟功能的队列(DelayedWorkQueue)、默认线程工厂(DefaultThreadFactory)、直接拒绝策略(AbortPolicy)。

4.5.2、api

Executors.newScheduledThreadPool(n);

4.5.3、demo

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. /**
  4. * Description: 创建个带调度功能的线程池来执行任务。
  5. *
  6. * @author TongWei.Chen 2020-07-09 21:28:34
  7. */
  8. public class ThreadPoolTest {
  9. public static void main(String[] args) {
  10. ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
  11. // 五秒一次
  12. scheduledExecutorService.schedule(() -> System.out.println(Thread.currentThread().getName()), 5, TimeUnit.SECONDS);
  13. // 首次五秒后执行,其次每隔1s执行一次
  14. scheduledExecutorService.scheduleAtFixedRate(() -> System.out.println(Thread.currentThread().getName()), 5, 1, TimeUnit.SECONDS);
  15. }
  16. }

4.5.4、问题

【同4.4、带缓存的线程池的问题】
问题就在于他的最大线程数是int的最大值,这将意味海量并发期能创建二十几亿个线程出来工作,可以想象多么可怕!!!

4.5.5、源码

  1. public ScheduledThreadPoolExecutor(int corePoolSize) {
  2. // 致命的问题跟newCachedThreadPool一样,最大线程数能开到几十亿(Integer.MAX_VALUE)!!!
  3. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  4. new DelayedWorkQueue());
  5. }

4.6、停止线程

4.6.1、shutdown

平缓的结束线程池,比如当前线程池还在执行任务,还没执行完,这时候执行了shutdown的话,线程池并不会立即停止工作,而是会等待线程池中的任务都执行完成后才会shutdown掉,但是如果执行shutdown了,外界还在继续提交任务到线程池,那么线程池会直接采取拒绝策略。

4.6.2、isShutdown

判断线程池是否已经shutdown。

4.6.3、shutdownNow

暴力结束线程池。不管当前线程池有没有任务在执行,队列里有没有堆积消息,都直接让线程池挂掉。但是他的返回值是队列里那些未被执行的任务。有需要的可以记录下log啥的。

4.7、疑问

这几种线程池为什么要采取不一样的队列?比如newFixedThreadPool为什么采取LinkedBlockingQueue,而newCachedThreadPool又为什么采取SynchronousQueue?
因为newFixedThreadPool线程数量有限,他又不想丢失任务,只能采取无界队列,而newCachedThreadPool的话本身自带int最大值个线程数,所以没必要用无界队列,他的宗旨就是有线程能处理,不需要队列。

5、总结几个问题

1、线程池的状态

  • RUNNING:接受新任务并处理排队任务。
  • SHUTDOWN:不接受新任务,但是会处理排队任务。【见:停止线程的4.6.1、shutdown】
  • STOP:不接受新任务,也不处理排队任务,并中端正在进行的任务。
  • TIDYING:所有任务都已经完事,工作线程为0的时候 ,线程会进入这个状态并执行terminate()钩子方法。
  • TERMINATED:terminate()钩子方法运行完成。

    2、线程池自动创建还是手动?

    肯定是手动了,因为Executors自动创建的那些线程池都存在致命的问题。手动创建线程池能自己控制线程数大小以及队列大小,还可以指定组名称等等个性化配置。重点不会出现致命问题,风险都把控在手里。

    3、线程数多少合适?

  • CPU密集型(比如加密、各种复杂计算等):建议设置为CPU核数+1。

  • 耗时IO操作(比如读写数据库,压缩解压缩大文件等等):一般会设置CPU核数的2倍。当然也有个很牛X的计算公式:线程数=CPU核数 *(1+平均等待时间/平均工作时间)

    4、before&after

    在线程执行前后可以通过两个方法来进行打印log或其他工作。
    源码如下:

    1. // 执行前的before
    2. beforeExecute(wt, task);
    3. Throwable thrown = null;
    4. try {
    5. // 真正执行
    6. task.run();
    7. } catch (RuntimeException x) {
    8. thrown = x; throw x;
    9. } catch (Error x) {
    10. thrown = x; throw x;
    11. } catch (Throwable x) {
    12. thrown = x; throw new Error(x);
    13. } finally {
    14. // 执行完成后after
    15. afterExecute(task, thrown);
    16. }

    6、核心源码(全)

    1、常用变量的解释

    1. // 1. `ctl`,可以看做一个int类型的数字,高3位表示线程池状态,低29位表示worker数量
    2. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    3. // 2. `COUNT_BITS`,`Integer.SIZE`为32,所以`COUNT_BITS`为29
    4. private static final int COUNT_BITS = Integer.SIZE - 3;
    5. // 3. `CAPACITY`,线程池允许的最大线程数。1左移29位,然后减1,即为 2^29 - 1
    6. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
    7. // runState is stored in the high-order bits
    8. // 4. 线程池有5种状态,按大小排序如下:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
    9. private static final int RUNNING = -1 << COUNT_BITS;
    10. private static final int SHUTDOWN = 0 << COUNT_BITS;
    11. private static final int STOP = 1 << COUNT_BITS;
    12. private static final int TIDYING = 2 << COUNT_BITS;
    13. private static final int TERMINATED = 3 << COUNT_BITS;
    14. // Packing and unpacking ctl
    15. // 5. `runStateOf()`,获取线程池状态,通过按位与操作,低29位将全部变成0
    16. private static int runStateOf(int c) { return c & ~CAPACITY; }
    17. // 6. `workerCountOf()`,获取线程池worker数量,通过按位与操作,高3位将全部变成0
    18. private static int workerCountOf(int c) { return c & CAPACITY; }
    19. // 7. `ctlOf()`,根据线程池状态和线程池worker数量,生成ctl值
    20. private static int ctlOf(int rs, int wc) { return rs | wc; }
    21. /*
    22. * Bit field accessors that don't require unpacking ctl.
    23. * These depend on the bit layout and on workerCount being never negative.
    24. */
    25. // 8. `runStateLessThan()`,线程池状态小于xx
    26. private static boolean runStateLessThan(int c, int s) {
    27. return c < s;
    28. }
    29. // 9. `runStateAtLeast()`,线程池状态大于等于xx
    30. private static boolean runStateAtLeast(int c, int s) {
    31. return c >= s;
    32. }

    2、构造方法

    1. public ThreadPoolExecutor(int corePoolSize,
    2. int maximumPoolSize,
    3. long keepAliveTime,
    4. TimeUnit unit,
    5. BlockingQueue<Runnable> workQueue,
    6. ThreadFactory threadFactory,
    7. RejectedExecutionHandler handler) {
    8. // 基本类型参数校验
    9. if (corePoolSize < 0 ||
    10. maximumPoolSize <= 0 ||
    11. maximumPoolSize < corePoolSize ||
    12. keepAliveTime < 0)
    13. throw new IllegalArgumentException();
    14. // 空指针校验
    15. if (workQueue == null || threadFactory == null || handler == null)
    16. throw new NullPointerException();
    17. this.corePoolSize = corePoolSize;
    18. this.maximumPoolSize = maximumPoolSize;
    19. this.workQueue = workQueue;
    20. // 根据传入参数`unit`和`keepAliveTime`,将存活时间转换为纳秒存到变量`keepAliveTime `中
    21. this.keepAliveTime = unit.toNanos(keepAliveTime);
    22. this.threadFactory = threadFactory;
    23. this.handler = handler;
    24. }

    3、提交执行task的过程

    1. public void execute(Runnable command) {
    2. if (command == null)
    3. throw new NullPointerException();
    4. /*
    5. * Proceed in 3 steps:
    6. *
    7. * 1. If fewer than corePoolSize threads are running, try to
    8. * start a new thread with the given command as its first
    9. * task. The call to addWorker atomically checks runState and
    10. * workerCount, and so prevents false alarms that would add
    11. * threads when it shouldn't, by returning false.
    12. *
    13. * 2. If a task can be successfully queued, then we still need
    14. * to double-check whether we should have added a thread
    15. * (because existing ones died since last checking) or that
    16. * the pool shut down since entry into this method. So we
    17. * recheck state and if necessary roll back the enqueuing if
    18. * stopped, or start a new thread if there are none.
    19. *
    20. * 3. If we cannot queue task, then we try to add a new
    21. * thread. If it fails, we know we are shut down or saturated
    22. * and so reject the task.
    23. */
    24. int c = ctl.get();
    25. // worker数量比核心线程数小,直接创建worker执行任务
    26. if (workerCountOf(c) < corePoolSize) {
    27. if (addWorker(command, true))
    28. return;
    29. c = ctl.get();
    30. }
    31. // worker数量超过核心线程数,任务直接进入队列
    32. if (isRunning(c) && workQueue.offer(command)) {
    33. int recheck = ctl.get();
    34. // 线程池状态不是RUNNING状态,说明执行过shutdown命令,需要对新加入的任务执行reject()操作。
    35. // 这儿为什么需要recheck,是因为任务入队列前后,线程池的状态可能会发生变化。
    36. if (! isRunning(recheck) && remove(command))
    37. reject(command);
    38. // 这儿为什么需要判断0值,主要是在线程池构造方法中,核心线程数允许为0
    39. else if (workerCountOf(recheck) == 0)
    40. addWorker(null, false);
    41. }
    42. // 如果线程池不是运行状态,或者任务进入队列失败,则尝试创建worker执行任务。
    43. // 这儿有3点需要注意:
    44. // 1. 线程池不是运行状态时,addWorker内部会判断线程池状态
    45. // 2. addWorker第2个参数表示是否创建核心线程
    46. // 3. addWorker返回false,则说明任务执行失败,需要执行reject操作
    47. else if (!addWorker(command, false))
    48. reject(command);
    49. }

    4、addworker源码解析

    1. private boolean addWorker(Runnable firstTask, boolean core) {
    2. retry:
    3. // 外层自旋
    4. for (;;) {
    5. int c = ctl.get();
    6. int rs = runStateOf(c);
    7. // 这个条件写得比较难懂,我对其进行了调整,和下面的条件等价
    8. // (rs > SHUTDOWN) ||
    9. // (rs == SHUTDOWN && firstTask != null) ||
    10. // (rs == SHUTDOWN && workQueue.isEmpty())
    11. // 1. 线程池状态大于SHUTDOWN时,直接返回false
    12. // 2. 线程池状态等于SHUTDOWN,且firstTask不为null,直接返回false
    13. // 3. 线程池状态等于SHUTDOWN,且队列为空,直接返回false
    14. // Check if queue empty only if necessary.
    15. if (rs >= SHUTDOWN &&
    16. ! (rs == SHUTDOWN &&
    17. firstTask == null &&
    18. ! workQueue.isEmpty()))
    19. return false;
    20. // 内层自旋
    21. for (;;) {
    22. int wc = workerCountOf(c);
    23. // worker数量超过容量,直接返回false
    24. if (wc >= CAPACITY ||
    25. wc >= (core ? corePoolSize : maximumPoolSize))
    26. return false;
    27. // 使用CAS的方式增加worker数量。
    28. // 若增加成功,则直接跳出外层循环进入到第二部分
    29. if (compareAndIncrementWorkerCount(c))
    30. break retry;
    31. c = ctl.get(); // Re-read ctl
    32. // 线程池状态发生变化,对外层循环进行自旋
    33. if (runStateOf(c) != rs)
    34. continue retry;
    35. // 其他情况,直接内层循环进行自旋即可
    36. // else CAS failed due to workerCount change; retry inner loop
    37. }
    38. }
    39. boolean workerStarted = false;
    40. boolean workerAdded = false;
    41. Worker w = null;
    42. try {
    43. w = new Worker(firstTask);
    44. final Thread t = w.thread;
    45. if (t != null) {
    46. final ReentrantLock mainLock = this.mainLock;
    47. // worker的添加必须是串行的,因此需要加锁
    48. mainLock.lock();
    49. try {
    50. // Recheck while holding lock.
    51. // Back out on ThreadFactory failure or if
    52. // shut down before lock acquired.
    53. // 这儿需要重新检查线程池状态
    54. int rs = runStateOf(ctl.get());
    55. if (rs < SHUTDOWN ||
    56. (rs == SHUTDOWN && firstTask == null)) {
    57. // worker已经调用过了start()方法,则不再创建worker
    58. if (t.isAlive()) // precheck that t is startable
    59. throw new IllegalThreadStateException();
    60. // worker创建并添加到workers成功
    61. workers.add(w);
    62. // 更新`largestPoolSize`变量
    63. int s = workers.size();
    64. if (s > largestPoolSize)
    65. largestPoolSize = s;
    66. workerAdded = true;
    67. }
    68. } finally {
    69. mainLock.unlock();
    70. }
    71. // 启动worker线程
    72. if (workerAdded) {
    73. t.start();
    74. workerStarted = true;
    75. }
    76. }
    77. } finally {
    78. // worker线程启动失败,说明线程池状态发生了变化(关闭操作被执行),需要进行shutdown相关操作
    79. if (! workerStarted)
    80. addWorkerFailed(w);
    81. }
    82. return workerStarted;
    83. }

    5、线程池worker任务单元

    1. private final class Worker
    2. extends AbstractQueuedSynchronizer
    3. implements Runnable
    4. {
    5. /**
    6. * This class will never be serialized, but we provide a
    7. * serialVersionUID to suppress a javac warning.
    8. */
    9. private static final long serialVersionUID = 6138294804551838833L;
    10. /** Thread this worker is running in. Null if factory fails. */
    11. final Thread thread;
    12. /** Initial task to run. Possibly null. */
    13. Runnable firstTask;
    14. /** Per-thread task counter */
    15. volatile long completedTasks;
    16. /**
    17. * Creates with given first task and thread from ThreadFactory.
    18. * @param firstTask the first task (null if none)
    19. */
    20. Worker(Runnable firstTask) {
    21. setState(-1); // inhibit interrupts until runWorker
    22. this.firstTask = firstTask;
    23. // 这儿是Worker的关键所在,使用了线程工厂创建了一个线程。传入的参数为当前worker
    24. this.thread = getThreadFactory().newThread(this);
    25. }
    26. /** Delegates main run loop to outer runWorker */
    27. public void run() {
    28. runWorker(this);
    29. }
    30. // 省略代码...
    31. }

    6、核心线程执行逻辑-runworker

    1. final void runWorker(Worker w) {
    2. Thread wt = Thread.currentThread();
    3. Runnable task = w.firstTask;
    4. w.firstTask = null;
    5. // 调用unlock()是为了让外部可以中断
    6. w.unlock(); // allow interrupts
    7. // 这个变量用于判断是否进入过自旋(while循环)
    8. boolean completedAbruptly = true;
    9. try {
    10. // 这儿是自旋
    11. // 1. 如果firstTask不为null,则执行firstTask;
    12. // 2. 如果firstTask为null,则调用getTask()从队列获取任务。
    13. // 3. 阻塞队列的特性就是:当队列为空时,当前线程会被阻塞等待
    14. while (task != null || (task = getTask()) != null) {
    15. // 这儿对worker进行加锁,是为了达到下面的目的
    16. // 1. 降低锁范围,提升性能
    17. // 2. 保证每个worker执行的任务是串行的
    18. w.lock();
    19. // If pool is stopping, ensure thread is interrupted;
    20. // if not, ensure thread is not interrupted. This
    21. // requires a recheck in second case to deal with
    22. // shutdownNow race while clearing interrupt
    23. // 如果线程池正在停止,则对当前线程进行中断操作
    24. if ((runStateAtLeast(ctl.get(), STOP) ||
    25. (Thread.interrupted() &&
    26. runStateAtLeast(ctl.get(), STOP))) &&
    27. !wt.isInterrupted())
    28. wt.interrupt();
    29. // 执行任务,且在执行前后通过`beforeExecute()`和`afterExecute()`来扩展其功能。
    30. // 这两个方法在当前类里面为空实现。
    31. try {
    32. beforeExecute(wt, task);
    33. Throwable thrown = null;
    34. try {
    35. task.run();
    36. } catch (RuntimeException x) {
    37. thrown = x; throw x;
    38. } catch (Error x) {
    39. thrown = x; throw x;
    40. } catch (Throwable x) {
    41. thrown = x; throw new Error(x);
    42. } finally {
    43. afterExecute(task, thrown);
    44. }
    45. } finally {
    46. // 帮助gc
    47. task = null;
    48. // 已完成任务数加一
    49. w.completedTasks++;
    50. w.unlock();
    51. }
    52. }
    53. completedAbruptly = false;
    54. } finally {
    55. // 自旋操作被退出,说明线程池正在结束
    56. processWorkerExit(w, completedAbruptly);
    57. }
    58. }

    7、自建线程池注意点

  • 阻塞任务队列数

  • 线程池的名字,最好跟业务相关
  • 核心线程池大小,看业务实际情况。
  • 最大线程池大小,看业务实际情况。
  • 拒绝策略,一般都是记录log,如果主要的业务会根据log做补偿。

比如:

  1. ThreadPoolExecutor executor = new ThreadPoolExecutor(CPU核数 + 1, 2 * CPU核数 + 1,
  2. 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
  3. // 线程池名字pay-account
  4. new DefaultThreadFactory("pay-account"), (r1, executor) -> {
  5. // 记录log 重新入队列做补偿
  6. });