一、自定义线程池

在看JDK的线程池之前,我们先来自定义一个线程池:

1.图解:

线程池 - 图1

  • 阻塞队列中维护了由主线程(或者其他线程)所产生的的任务
  • 主线程类似于生产者,产生任务并放入阻塞队列中
  • 线程池类似于消费者,得到阻塞队列中已有的任务并执行

2.具体实现

  1. package panw.ThreadPool;
  2. import javafx.concurrent.Worker;
  3. import lombok.extern.slf4j.Slf4j;
  4. import java.util.ArrayDeque;
  5. import java.util.Deque;
  6. import java.util.HashSet;
  7. import java.util.concurrent.TimeUnit;
  8. import java.util.concurrent.locks.Condition;
  9. import java.util.concurrent.locks.ReentrantLock;
  10. @Slf4j
  11. public class ThreadPoolTest {
  12. public static void main(String[] args) {
  13. ThreadPool threadPool = new ThreadPool(1, 1, TimeUnit.SECONDS, 1,
  14. (queue,task)->{
  15. // queue.put(task); // 死等
  16. // queue.offer(task,50,TimeUnit.MILLISECONDS); //超时
  17. // log.debug("放弃{}",task); //放弃,后面任务继续会执行
  18. // throw new RuntimeException("任务执行失败"+task); //抛出异常,后面任务不执行
  19. // task.run(); //自己执行
  20. }
  21. );
  22. for (int i = 0; i < 15; i++) {
  23. int j=i;
  24. threadPool.execute(()->{
  25. try {
  26. Thread.sleep(1000);
  27. } catch (InterruptedException e) {
  28. throw new RuntimeException(e);
  29. }
  30. log.debug("{}",j);
  31. });
  32. }
  33. }
  34. }
  35. @Slf4j
  36. class ThreadPool{
  37. private BlockingQueue<Runnable> queue ;
  38. private HashSet<Worker> workers =new HashSet<>();
  39. private int coreSize;
  40. private long timeout;
  41. private TimeUnit timeUnit;
  42. private RejectPolicy<Runnable> rejectPolicy;
  43. public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int capacity,RejectPolicy<Runnable> rejectPolicy) {
  44. this.coreSize = coreSize;
  45. this.timeout = timeout;
  46. this.timeUnit = timeUnit;
  47. queue=new BlockingQueue<>(capacity);
  48. this.rejectPolicy=rejectPolicy;
  49. }
  50. public void execute(Runnable task){
  51. synchronized (workers){
  52. if (workers.size()<coreSize){
  53. Worker worker = new Worker(task);
  54. log.debug("新增 worker{}, {}", worker, task);
  55. workers.add(worker);
  56. worker.start();
  57. }else {
  58. // queue.put(task);
  59. queue.tryPut(rejectPolicy,task);
  60. }
  61. }
  62. }
  63. class Worker extends Thread{
  64. private Runnable task;
  65. public Worker(Runnable task) {
  66. this.task = task;
  67. }
  68. @Override
  69. public void run() {
  70. while (task!=null||(task=queue.poll(timeout,timeUnit))!=null){
  71. try {
  72. log.debug("正在执行...{}", task);
  73. task.run();
  74. }catch (Exception e){
  75. }finally {
  76. task=null;
  77. }
  78. }
  79. synchronized (workers){
  80. log.debug("worker 被移除{}", this);
  81. workers.remove(this);
  82. }
  83. }
  84. }
  85. }
  86. @Slf4j
  87. class BlockingQueue<T>{
  88. private Deque<T> queue ;
  89. private int capacity;
  90. private ReentrantLock lock;
  91. private Condition fullWait;
  92. private Condition emptyWait;
  93. public BlockingQueue(int capacity) {
  94. this.queue=new ArrayDeque<>();
  95. this.capacity=capacity;
  96. this.lock= new ReentrantLock();
  97. this.fullWait=lock.newCondition();
  98. this.emptyWait= lock.newCondition();
  99. }
  100. //阻塞添加
  101. public void put(T t){
  102. lock.lock();
  103. try {
  104. while (queue.size()==capacity){
  105. log.debug("等待加入任务队列 {}", t);
  106. fullWait.await();
  107. }
  108. log.debug("加入任务队列 {}", t);
  109. queue.addLast(t);
  110. emptyWait.signal();
  111. } catch (InterruptedException e) {
  112. throw new RuntimeException(e);
  113. } finally {
  114. lock.unlock();
  115. }
  116. }
  117. //带超时添加
  118. public boolean offer(T t,long timeout,TimeUnit timeUnit){
  119. lock.lock();
  120. long nanos = timeUnit.toNanos(timeout);
  121. try {
  122. while (queue.size()==capacity){
  123. if (nanos<=0){
  124. log.debug("超时了...删除{}",t);
  125. return false;
  126. }
  127. log.debug("等待加入任务队列 {} ...", t);
  128. nanos = fullWait.awaitNanos(nanos);
  129. }
  130. log.debug("加入任务队列 {}", t);
  131. queue.addLast(t);
  132. emptyWait.signal();
  133. return true;
  134. } catch (InterruptedException e) {
  135. throw new RuntimeException(e);
  136. } finally {
  137. lock.unlock();
  138. }
  139. }
  140. //阻塞获取
  141. public T take(){
  142. lock.lock();
  143. try {
  144. while (queue.isEmpty()){
  145. emptyWait.await();
  146. }
  147. T t = queue.removeFirst();
  148. fullWait.signal();
  149. return t;
  150. } catch (InterruptedException e) {
  151. throw new RuntimeException(e);
  152. } finally {
  153. lock.unlock();
  154. }
  155. }
  156. //带超时获取
  157. public T poll(long timeout, TimeUnit timeUnit){
  158. lock.lock();
  159. long nanos = timeUnit.toNanos(timeout);
  160. try {
  161. while (queue.isEmpty()){
  162. if (nanos<=0){
  163. return null;
  164. }
  165. nanos = emptyWait.awaitNanos(nanos);
  166. }
  167. T t = queue.removeFirst();
  168. fullWait.signal();
  169. return t;
  170. } catch (InterruptedException e) {
  171. throw new RuntimeException(e);
  172. } finally {
  173. lock.unlock();
  174. }
  175. }
  176. //容量
  177. public int size(){
  178. lock.lock();
  179. try {
  180. return queue.size();
  181. }finally {
  182. lock.unlock();
  183. }
  184. }
  185. public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
  186. lock.lock();
  187. try {
  188. //判断队列是否已满
  189. if (queue.size()==capacity){
  190. rejectPolicy.reject(this,task);
  191. }else{
  192. log.debug("加入阻塞队列{}",task);
  193. queue.addLast(task);
  194. emptyWait.signal();
  195. }
  196. }finally {
  197. lock.unlock();
  198. }
  199. }
  200. }
  201. @FunctionalInterface
  202. interface RejectPolicy<T>{
  203. void reject(BlockingQueue<T> queue,T task);
  204. }

二、ThreadPoolExecutor

1.继承关系

image.png

2.线程池状态

  1. private static final int COUNT_BITS = Integer.SIZE - 3;
  2. private static final int RUNNING = -1 << COUNT_BITS;
  3. private static final int SHUTDOWN = 0 << COUNT_BITS;
  4. private static final int STOP = 1 << COUNT_BITS;
  5. private static final int TIDYING = 2 << COUNT_BITS;
  6. private static final int TERMINATED = 3 << COUNT_BITS;

从源码可以看出:

状态名称 高3位的值 描述
RUNNING 111 接收新任务,同时处理任务队列中的任务
SHUTDOWN 000 不接受新任务,但是处理任务队列中的任务
STOP 001 中断正在执行的任务,同时抛弃阻塞队列中的任务
TIDYING 010 任务执行完毕,活动线程为0时,即将进入终结阶段
TERMINATED 011 终结状态
  1. // 原子整数,前3位保存了线程池的状态,剩余位保存的是线程数量
  2. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  3. // 并不是所有平台的int都是32位。
  4. // 去掉前三位保存线程状态的位数,剩下的用于保存线程数量
  5. // 高3位为0,剩余位数全为1
  6. private static final int COUNT_BITS = Integer.SIZE - 3;
  7. // 2^COUNT_BITS次方,表示可以保存的最大线程数
  8. // CAPACITY 的高3位为 0
  9. private static final int CAPACITY = (1 << COUNT_BITS) - 1;

线程池状态和线程池中线程的数量由一个原子整型ctl来共同表示

  • 使用一个数来表示两个值的主要原因是:可以通过一次CAS同时更改两个属性的值

线程池内部属性也是包含了上述自定义线程池的。

  1. // 工作线程,内部封装了Thread
  2. private final class Worker
  3. extends AbstractQueuedSynchronizer
  4. implements Runnable {
  5. ...
  6. }
  7. // 阻塞队列,用于存放来不及被核心线程执行的任务
  8. private final BlockingQueue<Runnable> workQueue;
  9. // 锁
  10. private final ReentrantLock mainLock = new ReentrantLock();
  11. // 用于存放核心线程的容器,只有当持有锁时才能够获取其中的元素(核心线程)
  12. private final HashSet<Worker> workers = new HashSet<Worker>();

3.构造方法及参数

直接来看看最多参数的构造:

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler)

参数解释

  • corePoolSize:核心线程数
  • maximumPoolSize:最大线程数
    • maximumPoolSize - corePoolSize = 救急线程数
  • keepAliveTime:救急线程空闲时的最大生存时间
  • unit:时间单位
  • workQueue:阻塞队列(存放任务)
    • 有界阻塞队列 ArrayBlockingQueue
    • 无界阻塞队列 LinkedBlockingQueue
    • 最多只有一个同步元素的 SynchronousQueue
    • 优先队列 PriorityBlockingQueue
  • threadFactory:线程工厂(给线程取名字)不重要
  • handler:拒绝策略 不重要

    工作流程:

  • 当一个任务传给线程池以后,可能有以下几种可能

    • 将任务分配给一个核心线程来执行
    • 核心线程都在执行任务,将任务放到阻塞队列workQueue中等待被执行
    • 阻塞队列满了,使用救急线程来执行任务
      • 救急线程用完以后,超过生存时间(keepAliveTime)后会被释放
    • 任务总数大于了 最大线程数(maximumPoolSize)与阻塞队列容量的最大值(workQueue.capacity),使用拒绝策略

      拒绝策略

      20201022194718.png
  • AbortPolicy:让调用者抛出 RejectedExecutionException 异常,这是默认策略

    1. public static class AbortPolicy implements RejectedExecutionHandler {
    2. /**
    3. * Creates an {@code AbortPolicy}.
    4. */
    5. public AbortPolicy() { }
    6. /**
    7. * Always throws RejectedExecutionException.
    8. *
    9. * @param r the runnable task requested to be executed
    10. * @param e the executor attempting to execute this task
    11. * @throws RejectedExecutionException always
    12. */
    13. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    14. throw new RejectedExecutionException("Task " + r.toString() +
    15. " rejected from " +
    16. e.toString());
    17. }
    18. }
  • DiscardPolicy:放弃本次任务

    1. public static class DiscardPolicy implements RejectedExecutionHandler {
    2. /**
    3. * Creates a {@code DiscardPolicy}.
    4. */
    5. public DiscardPolicy() { }
    6. /**
    7. * Does nothing, which has the effect of discarding task r.
    8. *
    9. * @param r the runnable task requested to be executed
    10. * @param e the executor attempting to execute this task
    11. */
    12. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    13. }
    14. }
  • DiscardOldestPolicy:放弃队列中最早的任务,本任务取而代之

    1. public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    2. /**
    3. * Creates a {@code DiscardOldestPolicy} for the given executor.
    4. */
    5. public DiscardOldestPolicy() { }
    6. /**
    7. * Obtains and ignores the next task that the executor
    8. * would otherwise execute, if one is immediately available,
    9. * and then retries execution of task r, unless the executor
    10. * is shut down, in which case task r is instead discarded.
    11. *
    12. * @param r the runnable task requested to be executed
    13. * @param e the executor attempting to execute this task
    14. */
    15. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    16. if (!e.isShutdown()) {
    17. e.getQueue().poll();
    18. e.execute(r);
    19. }
    20. }
    21. }
  • CallerRunsPolicy:让调用者运行任务

    1. public static class CallerRunsPolicy implements RejectedExecutionHandler {
    2. /**
    3. * Creates a {@code CallerRunsPolicy}.
    4. */
    5. public CallerRunsPolicy() { }
    6. /**
    7. * Executes task r in the caller's thread, unless the executor
    8. * has been shut down, in which case the task is discarded.
    9. *
    10. * @param r the runnable task requested to be executed
    11. * @param e the executor attempting to execute this task
    12. */
    13. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    14. if (!e.isShutdown()) {
    15. r.run();
    16. }
    17. }
    18. }

    简单使用

    ```java package panw.ThreadPool;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger;

@Slf4j public class ThreadPoolExecutorTest { public static void main(String[] args) { // new ThreadPoolExecutor(1,4,1, TimeUnit.SECONDS,); AtomicInteger atomic = new AtomicInteger(0);

  1. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 4, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3), r -> new Thread(r,"MyThread-"+atomic.getAndIncrement()), new ThreadPoolExecutor.CallerRunsPolicy());
  2. for (int i = 0; i < 10; i++) {
  3. threadPoolExecutor.execute(()->{
  4. log.debug(String.valueOf(Thread.currentThread()));
  5. try {
  6. Thread.sleep(2000);
  7. } catch (InterruptedException e) {
  8. throw new RuntimeException(e);
  9. }
  10. });
  11. }
  12. }

}

  1. <a name="Pd20a"></a>
  2. ### 4.常见四种线程池
  3. <a name="Y0vYw"></a>
  4. #### FixedThreadPool()
  5. 定长线程池:
  6. - 可控制线程最大并发数(同时执行的线程数)
  7. - 超出的线程会在队列中等待
  8. ```java
  9. //nThreads => 最大线程数即maximumPoolSize
  10. ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads);
  11. ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory);
  12. //构造
  13. public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
  14. return new ThreadPoolExecutor(nThreads, nThreads,
  15. 0L, TimeUnit.MILLISECONDS,
  16. new LinkedBlockingQueue<Runnable>(),
  17. threadFactory);
  18. }

CachedThreadPool

可缓存线程池:

  • 线程数无限制
  • 有空闲线程则复用空闲线程,若无空闲线程则新建线程
  • 一定程序减少频繁创建/销毁线程,减少系统开销

    1. //创建
    2. ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    3. //构造
    4. public static ExecutorService newCachedThreadPool() {
    5. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    6. 60L, TimeUnit.SECONDS,
    7. new SynchronousQueue<Runnable>());
    8. }

    SingleThreadExecutor()

    单线程化的线程池:

  • 有且仅有一个工作线程执行任务

  • 所有任务按照指定顺序执行,即遵循队列的入队出队规则

    1. //创建
    2. ExecutorService singleThreadPool = Executors.newSingleThreadPool();
    3. //构造
    4. public static ExecutorService newSingleThreadExecutor() {
    5. return new FinalizableDelegatedExecutorService
    6. (new ThreadPoolExecutor(1, 1,
    7. 0L, TimeUnit.MILLISECONDS,
    8. new LinkedBlockingQueue<Runnable>()));
    9. }

    几个注意
  • SingleThread和自己创建一个线程来运行多个任务的区别

    • 当线程正在执行的任务发生错误时,如果是自己创建的线程,该任务和剩余的任务就无法再继续运行下去。而SingleThread会创建一个新线程,继续执行任务队列中剩余的任务。
  • SingleThread和newFixedThreadPool(1)的区别

    1. // 强转为ThreadPoolExecutor
    2. ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
    3. // 改变核心线程数
    4. threadPool.setCorePoolSize(2);
    • newFixedThreadPool(1)传值为1,可以将FixedThreadPool强转为ThreadPoolExecutor,然后通过setCorePoolSize改变核心线程数
    • 而SingleThread无法修改核心线程数,因为返回的是FinalizableDelegatedExecutorService
      1. public static ExecutorService newSingleThreadExecutor() {
      2. return new FinalizableDelegatedExecutorService
      3. (new ThreadPoolExecutor(1, 1,
      4. 0L, TimeUnit.MILLISECONDS,
      5. new LinkedBlockingQueue<Runnable>()));
      6. }

      ScheduledThreadPool()

      定时线程池:
  • 支持定时及周期性任务执行 ```java //nThreads => 最大线程数即maximumPoolSize ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);

//构造 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }

//ScheduledThreadPoolExecutor(): public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); }

  1. <a name="oGkND"></a>
  2. ##### 定时功能
  3. ```java
  4. ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
  5. //定时,以固定频率,上一个任务开始的时间计时,一个period后,检测上一个任务是否执行完毕,
  6. // 如果上一个任务执行完毕,则当前任务立即执行,
  7. // 如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行。
  8. pool.scheduleAtFixedRate(()->{log.debug(LocalDateTime.now().toString());},1,5, TimeUnit.SECONDS);
  9. //1表示首次执行任务的延迟时间,5表示每次执行任务的间隔时间,TimeUnit.SECONDS执行的时间间隔数值单位
  10. //定时,以固定的延时 delay(延时)指的是一次执行终止和下一次执行开始之间的延迟。
  11. pool.scheduleWithFixedDelay(()->{log.debug(LocalDateTime.now().toString());},1,5,TimeUnit.SECONDS);