1. 线程池基础介绍

1.1 什么是”池”?

可以理解为计划经济.

1.2 线程池适合应用的场合

  • 服务器接收到大量的请求时,使用线程池技术是非常合适的,它可以大大减少线程的创建和销毁次数,提高服务器的工作效率.
  • 实际上,开发中,如果需要创建5个线程以上,那么就可以使用线程池来管理了.

    2. 参数说明

    1. public ThreadPoolExecutor(int corePoolSize,
    2. int maximumPoolSize,
    3. long keepAliveTime,
    4. TimeUnit unit,
    5. BlockingQueue<Runnable> workQueue,
    6. ThreadFactory threadFactory,
    7. RejectedExecutionHandler handler)
参数 说明
corePoolSize 核心线程数量,线程池维护线程的最少数量
maximumPoolSize 线程池维护线程的最大数量
keepAliveTime 线程池除核心线程外的其他线程的最长空闲时间,超过该时间的空闲线程会被销毁
unit keepAliveTime的单位,TimeUnit中的几个静态属性:NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS
workQueue 线程池所使用的任务缓冲队列
threadFactory 线程工厂,用于创建线程,一般用默认的即可
handler 线程池对拒绝任务的处理策略

2.1 corePoolSize

  1. 核心线程数;
  2. 线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,
  3. 再创建新线程去执行任务。

2.2 maximumPoolSize

线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线程数会有一个上限,这个最大值就是 maxinumPoolSize

2.3 keepAliveTime

如果线程池当前的线程数 大于 corePoolSize, 那么如果多余的线程空闲时间超过keepAliveTime,它们就会被终止。

2.3.1 allowCoreThreadTimeOut

这个设置就是说 如果设置为true ,核心线程数 corePoolSize 超过keepAliveTime 空闲也会被回收。

  1. /**
  2. * If false (default), core threads stay alive even when idle.
  3. * If true, core threads use keepAliveTime to time out waiting
  4. * for work.
  5. */
  6. private volatile boolean allowCoreThreadTimeOut;

2.4 unit

2.5 workQueue

2.5.1 三种常见的队列类型

2.5.1.1 SynchronousQueue 直接交接

2.5.1.2 LinkedBlockingQueue 无界队列

2.5.1.3 ArrayBlockingQueue 有界队列

2.6 threadFactory

  • 新的线程是由 ThreadFactory创建的,默认使用Executors.defaultThreadFactory(),创建出来的线程:

【1】 在同一个线程组
【2】 拥有同样的NORM_PRIORITY优先级并且都不是守护线程。
【3】如果是自己指定的ThreadFactory ,那么就可以改变线程名,线程组,优先级,是否是守护线程等等。

2.7 handler 拒绝策略

2.7.1 AbortPolicy 中止策略

  1. /**
  2. * A handler for rejected tasks that throws a
  3. * {@code RejectedExecutionException}.
  4. */
  5. public static class AbortPolicy implements RejectedExecutionHandler {
  6. /**
  7. * Creates an {@code AbortPolicy}.
  8. */
  9. public AbortPolicy() { }
  10. /**
  11. * Always throws RejectedExecutionException.
  12. *
  13. * @param r the runnable task requested to be executed
  14. * @param e the executor attempting to execute this task
  15. * @throws RejectedExecutionException always
  16. */
  17. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  18. throw new RejectedExecutionException("Task " + r.toString() +
  19. " rejected from " +
  20. e.toString());
  21. }
  22. }
  • 特性

    1. A handler for rejected tasks that throws a RejectedExecutionException.
    2. 只抛出异常,其余啥也不做. 抛弃异常
  • 特点: 直接抛出异常

    2.7.1.1 例子说明

    1. public class RunnableStyle {
    2. static int corePoolSize = 1;
    3. static int maximumPoolSize = 2;
    4. static int keepAliveTime = 2;
    5. static AtomicInteger count=new AtomicInteger();
    6. public static void main(String[] args) {
    7. ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), new ThreadPoolExecutor.AbortPolicy());
    8. for (int i = 0; i < 100; i++) {
    9. executor.execute(new MyRunnable());
    10. }
    11. executor.shutdown();
    12. }
    13. static class MyRunnable implements Runnable {
    14. @Override
    15. public void run() {
    16. try {
    17. System.out.println("线程:" + Thread.currentThread() + "开始执行任务ID:" + count.incrementAndGet());
    18. Thread.sleep(3000L);
    19. System.out.println("线程:" + Thread.currentThread() + "执行完了任务");
    20. } catch (InterruptedException e) {
    21. e.printStackTrace();
    22. }
    23. }
    24. }
    25. }

    ```java Exception in thread “main” java.util.concurrent.RejectedExecutionException 线程:Thread[pool-1-thread-2,5,main]开始执行任务ID:1 线程:Thread[pool-1-thread-1,5,main]开始执行任务ID:2 线程:Thread[pool-1-thread-1,5,main]执行完了任务 线程:Thread[pool-1-thread-2,5,main]执行完了任务 线程:Thread[pool-1-thread-1,5,main]开始执行任务ID:3 线程:Thread[pool-1-thread-2,5,main]开始执行任务ID:4 线程:Thread[pool-1-thread-1,5,main]执行完了任务 线程:Thread[pool-1-thread-2,5,main]执行完了任务

这个线程池 最大线程数量为2 ,队列长度为2 ,也就是最大能接受四个任务,其余任务直接抛弃,并抛出异常。

  1. <a name="vxGcp"></a>
  2. ### 2.7.2 DiscardPolicy 丢弃策略
  3. ```java
  4. /**
  5. * A handler for rejected tasks that silently discards the
  6. * rejected task.
  7. */
  8. public static class DiscardPolicy implements RejectedExecutionHandler {
  9. /**
  10. * Creates a {@code DiscardPolicy}.
  11. */
  12. public DiscardPolicy() { }
  13. /**
  14. * Does nothing, which has the effect of discarding task r.
  15. *
  16. * @param r the runnable task requested to be executed
  17. * @param e the executor attempting to execute this task
  18. */
  19. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  20. }
  21. }
  • 特性

    1. 拒绝任务的处理程序,以静默方式丢弃被拒绝的任务.

    2.7.2.1 举例说明

    1. public class RunnableStyle {
    2. static int corePoolSize = 1;
    3. static int maximumPoolSize = 2;
    4. static int keepAliveTime = 2;
    5. static AtomicInteger count=new AtomicInteger();
    6. public static void main(String[] args) {
    7. ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), new ThreadPoolExecutor.DiscardPolicy());
    8. for (int i = 0; i < 100; i++) {
    9. executor.execute(new MyRunnable());
    10. }
    11. executor.shutdown();
    12. }
    13. static class MyRunnable implements Runnable {
    14. @Override
    15. public void run() {
    16. try {
    17. System.out.println("线程:" + Thread.currentThread() + "开始执行任务ID:" + count.incrementAndGet());
    18. Thread.sleep(3000L);
    19. System.out.println("线程:" + Thread.currentThread() + "执行完了任务");
    20. } catch (InterruptedException e) {
    21. e.printStackTrace();
    22. }
    23. }
    24. }
    25. }

    ```java 线程:Thread[pool-1-thread-2,5,main]开始执行任务ID:2 线程:Thread[pool-1-thread-1,5,main]开始执行任务ID:1 线程:Thread[pool-1-thread-2,5,main]执行完了任务 线程:Thread[pool-1-thread-1,5,main]执行完了任务 线程:Thread[pool-1-thread-2,5,main]开始执行任务ID:3 线程:Thread[pool-1-thread-1,5,main]开始执行任务ID:4 线程:Thread[pool-1-thread-2,5,main]执行完了任务 线程:Thread[pool-1-thread-1,5,main]执行完了任务

这个线程池 最大线程数量为2 ,队列长度为2 ,也就是最大能接受四个任务,其余任务直接抛弃,不抛出异常。

  1. <a name="7pMoG"></a>
  2. ### 2.7.3 CallerRunsPolicy
  3. <a name="a7tqq"></a>
  4. #### 2.7.3.1 拒绝策略代码
  5. ```cpp
  6. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  7. if (!e.isShutdown()) {
  8. r.run();
  9. }
  10. }
  11. 将使用当前线程 执行 该任务,一般为主线程,主线程阻塞,新增的任务 阻塞在 计算机内存中。
  1. /**
  2. * A handler for rejected tasks that runs the rejected task
  3. * directly in the calling thread of the {@code execute} method,
  4. * unless the executor has been shut down, in which case the task
  5. * is discarded.
  6. */
  7. public static class CallerRunsPolicy implements RejectedExecutionHandler {
  8. /**
  9. * Creates a {@code CallerRunsPolicy}.
  10. */
  11. public CallerRunsPolicy() { }
  12. /**
  13. * Executes task r in the caller's thread, unless the executor
  14. * has been shut down, in which case the task is discarded.
  15. * 在调用者的线程中(可以理解为 主线程)执行任务,除非
  16. *
  17. * @param r the runnable task requested to be executed
  18. * @param e the executor attempting to execute this task
  19. */
  20. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  21. if (!e.isShutdown()) {
  22. r.run();
  23. }
  24. }
  25. }

2.7.3.1 举例说明

  1. public class RunnableStyle {
  2. static int corePoolSize = 1;
  3. static int maximumPoolSize = 2;
  4. static int keepAliveTime = 2;
  5. static AtomicInteger count=new AtomicInteger();
  6. public static void main(String[] args) {
  7. ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), new ThreadPoolExecutor.CallerRunsPolicy());
  8. for (int i = 0; i < 100; i++) {
  9. executor.execute(new MyRunnable());
  10. }
  11. executor.shutdown();
  12. }
  13. static class MyRunnable implements Runnable {
  14. @Override
  15. public void run() {
  16. try {
  17. System.out.println("线程:" + Thread.currentThread() + "开始执行任务ID:" + count.incrementAndGet());
  18. Thread.sleep(3000L);
  19. System.out.println("线程:" + Thread.currentThread() + "执行完了任务");
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. }
  25. }
  1. 线程不够用时,主线程也会参与执行任务,所以导致的问题就是 阻塞主线程;

2.7.4 DiscardOldestPolicy

2.7.4.1 拒绝策略代码

  1. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  2. if (!e.isShutdown()) {
  3. e.getQueue().poll();
  4. e.execute(r);
  5. }
  6. }
  7. 获取任务队列,将头部任务移除掉,将新任务加入队列中。
  1. package com.zhangyong.multiThread.multiThread.createThreads;
  2. import java.util.concurrent.ArrayBlockingQueue;
  3. import java.util.concurrent.ThreadPoolExecutor;
  4. import java.util.concurrent.TimeUnit;
  5. import java.util.concurrent.atomic.AtomicInteger;
  6. /**
  7. * <p>Description: </p>
  8. * <p>Company: http://www.dmall.com</p>
  9. *
  10. * @author yong.zhang@dmall.com
  11. * @version 1.0.0
  12. * @date 2020/12/17 20:18
  13. */
  14. public class RunnableStyle {
  15. static int corePoolSize = 1;
  16. static int maximumPoolSize = 3;
  17. static int keepAliveTime = 2;
  18. static AtomicInteger count=new AtomicInteger();
  19. public static void main(String[] args) {
  20. ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), new ThreadPoolExecutor.DiscardOldestPolicy());
  21. for (int i = 0; i < 100; i++) {
  22. executor.execute(new MyRunnable());
  23. }
  24. executor.shutdown();
  25. }
  26. static class MyRunnable implements Runnable {
  27. @Override
  28. public void run() {
  29. try {
  30. System.out.println("线程:" + Thread.currentThread() + "开始执行任务ID:" + count.incrementAndGet());
  31. Thread.sleep(3000L);
  32. System.out.println("线程:" + Thread.currentThread() + "执行完了任务");
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. }
  38. }
  1. 线程:Thread[pool-1-thread-1,5,main]开始执行任务ID:2
  2. 线程:Thread[pool-1-thread-2,5,main]开始执行任务ID:1
  3. 线程:Thread[pool-1-thread-3,5,main]开始执行任务ID:3
  4. 线程:Thread[pool-1-thread-2,5,main]执行完了任务
  5. 线程:Thread[pool-1-thread-1,5,main]执行完了任务
  6. 线程:Thread[pool-1-thread-2,5,main]开始执行任务ID:4
  7. 线程:Thread[pool-1-thread-1,5,main]开始执行任务ID:5
  8. 线程:Thread[pool-1-thread-3,5,main]执行完了任务
  9. 线程:Thread[pool-1-thread-2,5,main]执行完了任务
  10. 线程:Thread[pool-1-thread-1,5,main]执行完了任务;
  11. 新的任务加入到线程中后,如果没有被立刻执行的话,

特性:丢弃线程队列的旧任务,将新的任务添加进去.

3.线程池任务执行

3.1 添加线程规则

进阶篇-线程池 - 图1

  • 1. 如果线程数小于corePoolSize , 除非设置了allowCoreThreadTimeOut , 即使其他工作线程处于空闲状态,也会创建一个新线程来运行新任务。
  • 2. 如果线程数>= corePoolSize 但少于 maximumPoolSize ,则将任务放入队列。
  • 3. 如果队列已满,并且线程数小于maxPoolSize, 则创建一个新线程来运行任务。
  • [x] 4. 如果队列已满,并且线程数大于maxPoolSize,则使用设置的拒绝策略拒绝该任务。

    3.2 增减线程的特点

  • [x] 1. 通过设置corePoolSize 和 maximumPoolSize 相同,就可以创建固定大小的线程池。

  • 2. 线程池希望的是 永远保持较少的线程数,并且只有在负载变得很大的时候才去增大它。
  • 3. 通过设置maximumPoolSize 为很高的值,例如Integer.MAX_VALUE , 可以允许线程池容纳任意数量的并发任务。但是实际上硬件上的限制不会出现这种情况
  • 4. 只有在队列填满时才创建多余corePoolSize 的线程,所以如果你使用的是无界队列(LinkedBlockingQueue) , 那么线程数就不会超过corePoolSize;

    4. 线程池创建的方式

    4.1 Executors 自动创建线程池

    4.1.1 Executors.newFixedThreadPool()

    4.1.1.1 构造参数

    ```java public static ExecutorService newFixedThreadPool(int nThreads) {
    1. return new ThreadPoolExecutor(nThreads, nThreads,
    2. 0L, TimeUnit.MILLISECONDS,
    3. new LinkedBlockingQueue<Runnable>());
    }

corePoolSize 和 maxPoolSize 相同, 那么keepAliveTime 这个参数就无意义了,因为他不考虑回收多余corePoolSize的线程

  1. <a name="1WTkU"></a>
  2. #### 4.1.1.2 原理
  3. ```java
  4. public class FixedThreadPoolOOM {
  5. public static void main(String[] args) {
  6. ExecutorService ex = Executors.newFixedThreadPool(1);
  7. for (int i=0;i<Integer.MAX_VALUE;++i){
  8. ex.execute(new Task());
  9. }
  10. }
  11. }
  12. class Task implements Runnable {
  13. @Override
  14. public void run() {
  15. try {
  16. Thread.sleep(100000000000L);
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. }
  • 由于LinkedBlockingQueue 是没有容量上限的,所以当请求越来越多的时候,并且无法及时处理完毕的时候,也就是请求堆积的时候,会容易造成占用大量的内存,最终导致OOM.

    4.1.2 Executors.newSingleThreadExecutor()

    4.1.2.1 构造参数

    1. public static ExecutorService newSingleThreadExecutor() {
    2. return new FinalizableDelegatedExecutorService
    3. (new ThreadPoolExecutor(1, 1,
    4. 0L, TimeUnit.MILLISECONDS,
    5. new LinkedBlockingQueue<Runnable>()));
    6. }
    7. corePoolSize maxPoolSize 相同, 那么keepAliveTime 这个参数就无意义了,因为他不考虑回收多余 corePoolSize的线程;

    4.1.2.2 原理

    1. 缺点和 newFixedThreadPool 一样.

    4.1.3 Executors.newCachedThreadPool();

    4.1.3.1 构造参数

    1. public static ExecutorService newCachedThreadPool() {
    2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    3. 60L, TimeUnit.SECONDS,
    4. new SynchronousQueue<Runnable>());
    5. }
    6. corePoolSize 0
    7. maxPoolSize Int的最大值;
    8. 可回收线程;
    9. 队列采用的是Sync Queue ,它一种阻塞队列,其中每个 put 必须等待一个 take.

    4.1.3.2 原理

    1. maxPoolSize设置为Integer.MAX_VALUE, 这可能会创建数量很多的线程,甚至导致OOM.

    4.1.4 Executors.newScheduledThreadPool()

    定时执行

    4.1.4.1 构造参数

    4.1.4.2 原理

4.2 手动创建线程池

4.2.1 为啥要手动创建

4.2.2 手动创建

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue) {
  6. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  7. Executors.defaultThreadFactory(), defaultHandler);
  8. }
  9. 自定义输入各参数
  1. /**
  2. * 用于异步的提交连接请求的线程池
  3. */
  4. private ThreadPoolExecutor threadPoolExecutor
  5. = new ThreadPoolExecutor(16, 16, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1 << 16));

4.2.3 设置线程池中线程数量的策略

image.png
POP 订单批量导入导出中,消耗内存。

  1. 配置 coolPoolSize 公式:CPU核数 / (1- 阻塞系数) ---> 阻塞系数在0.8 ~ 0.9之间


例如:8核CPU: 8/1-0.9 = 80个线程数

引用

https://www.cnblogs.com/huangjuncong/p/10031525.html