1.线程池原理

所谓线程池,通俗的理解就是有一个池子,里面存放着已经创建好的线程,当有任务提交给线程池执行时,池子中的某个线程会主动执行该任务。如果池子中的线程数量不够应付数量众多的任务时,则需要自动扩充新的线程到池子中,但是该数量是有限的,就好比池塘的水界线一样。当任务比较少的时候,池子中的线程能够自动回收,释放资源。为了能够异步地提交任务和缓存未被处理的任务,需要有一个任务队列,如图所示。
image.png
通过上面的描述可知,一个完整的线程池应该具备如下要素。

  • 任务队列:用于缓存提交的任务。
  • 线程数量管理功能:一个线程池必须能够很好地管理和控制线程数量,可通过如下三个参数来实现, 比如创建线程池时初始的线程数量in it; 线程池自动扩充时最大的线程数量max; 在线程池空闲时需要释放线程但是也要维护一定数量的活跃数量或者核心数量core。有了这三个参数, 就能够很好地控制线程池中的线程数量, 将其维护在一个合理的范围之内, 三者之间的关系是in it<=core<=max
  • 任务拒绝策略:如果线程数量已达到上限且任务队列已满,则需要有相应的拒绝策略来通知任务提交者。
  • 线程工厂:主要用于个性化定制线程,比如将线程设置为守护线程以及设置线程名称等。
  • Queue Size:任务队列主要存放提交的Runnable, 但是为了防止内存溢出, 需要有limit数量对其进行控制。
  • Keepedalive时间:该时间主要决定线程各个重要参数自动维护的时间间隔

    2.线程池实现

    线程池实现类图
    image.png

    2.1线程池接口定义

    1.ThreadPool

    ThreadPool主要定义了一个线程池应该具备的基本操作和方法, 下面是ThreadPool接口定义的方法:

    1. public interface ThreadPool {
    2. // 提交任务到线程池
    3. void execute(Runnable runnable);
    4. // 关闭线程池
    5. void shutdown();
    6. // 获取线程池的初始化大小
    7. int getInitSize();
    8. // 获取线程池最大的线程数
    9. int getMaxSize();
    10. // 获取线程池的核心线程数量
    11. int getCoreSize();
    12. // 获取线程池中用于缓存任务队列的大小
    13. int getQueueSize();
    14. // 获取线程池中活跃线程的数量
    15. int getActiveCount();
    16. // 查看线程池是否已经被shutdown
    17. boolean isShutdown();
    18. }

    2.RunnableQueue

    RunanbleQueue主要用于存放提交的Runnable,该Runnable是一个BlockedQueue, 并且有limit的限制, 示例代码所示。

    1. // 任务队列,主要用于缓存提交到线程池中的任务
    2. public interface RunnableQueue {
    3. // 当前新的任务进来时首先会offer到队列中
    4. void offer(Runnable runnable);
    5. // 工作线程通过take方法获取Runnable
    6. Runnable take();
    7. // 获取任务队列中任务的数量
    8. int size();
    9. }

    3.ThreadFactory

    ThreadFactory提供了创建线程的接口,以便于个性化地定制Thread,比如Thread应该被加到哪个Group中, 优先级、线程名字以及是否为守护线程等,示例代码所示。

    1. // 创建线程的工厂
    2. @FunctionalInterface
    3. public interface ThreadFactory {
    4. // 用于创建线程
    5. Thread createThread(Runnable runnable);
    6. }

    4.DenyPolicy

    DenyPolicy主要用于当Queue中的runnable达到了limit上限时,决定采用何种策略通知提交者。该接口中定义了三种默认的实现,具体如代码所示。

  1. @FunctionalInterface
  2. public interface DenyPolicy {
  3. // 拒绝方法
  4. void reject(Runnable runnable, ThreadPool threadPool);
  5. // 该拒绝策略会直接将任务丢弃
  6. class DiscardDenyPolicy implements DenyPolicy {
  7. @Override
  8. public void reject(Runnable runnable, ThreadPool threadPool) {
  9. // do nothing
  10. }
  11. }
  12. // 该拒绝策略会向任务提交者抛出异常
  13. class AbortDenyPolicy implements DenyPolicy {
  14. @Override
  15. public void reject(Runnable runnable, ThreadPool threadPool) {
  16. // throw new Run
  17. }
  18. }
  19. // 该拒绝策略会使任务提交者所在的线程中执行任务
  20. class RunnerDenyPolicy implements DenyPolicy {
  21. @Override
  22. public void reject(Runnable runnable, ThreadPool threadPool) {
  23. if( !threadPool.isShutdown()) {
  24. runnable.run();
  25. }
  26. }
  27. }
  28. }

5.RunnableDenyException

RunnableDenyException是RuntimeException的子类,主要用于通知任务提交者,任务队列已无法再接收新的任务。示例代码如所示。

  1. public class RunnableDenyException extends RuntimeException {
  2. public RunnableDenyException(String message) {
  3. super(message);
  4. }
  5. }

6.InternalTask

InternalTask是Runnable的一个实现,主要用于线程池内部,该类会使用到RunnableQueue,然后不断地从queue中取出某个runnable,并运行runnable的run方法,示例代码如代码所示。

  1. public class InternalTask implements Runnable{
  2. private final RunnableQueue runnableQueue;
  3. private volatile boolean running = true;
  4. public InternalTask(RunnableQueue runnableQueue) {
  5. this.runnableQueue = runnableQueue;
  6. }
  7. @Override
  8. public void run() {
  9. // 如果当前任务为running并且没有被中断,则其将不断地从queue中获取runnable,然后执行run方法
  10. while( running && !Thread.currentThread().isInterrupted() ) {
  11. try {
  12. Runnable task = runnableQueue.take();
  13. task.run();
  14. } catch (Exception e) {
  15. running = false;
  16. break;
  17. }
  18. }
  19. }
  20. public void stop() {
  21. this.running = false;
  22. }
  23. }

2.2线程池详细实现

本节将对线程池进行详细的实现,其中会涉及很多同步的技巧和资源竞争,我们将结合本书第一部分的大多数知识灵活使用,也算是一个综合练习。

1.LinkedRunnableQueue

LinkedRunnableQueue代码实现方式:

  1. import java.util.LinkedList;
  2. public class LinkedRunnableQueue implements RunnableQueue{
  3. // 任务队列的最大容量,在构造时传入
  4. private final int limit;
  5. // 若任务队列中的任务已经满了,则需要执行拒绝策略
  6. private final DenyPolicy denyPolicy;
  7. // 存放任务的队列
  8. private final LinkedList<Runnable> runnableLinkedList = new LinkedList<>();
  9. private final ThreadPool threadPool;
  10. public LinkedRunnableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool) {
  11. this.limit = limit;
  12. this.denyPolicy = denyPolicy;
  13. this.threadPool = threadPool;
  14. }
  15. /*
  16. 在LinkedRunnableQueue中有几个重要的属性,第一个是limit,也就是Runnable队
  17. 列的上限;当提交的Runnable数量达到limit上限时,则会调用DenyPolicy的reject方法;
  18. runnableList是一个双向循环列表,用于存放Runnable任务,示例代码如下:
  19. */
  20. /*
  21. offer方法是一个同步方法,如果队列数量达到了上限,则会执行拒绝策略,否则会将runnable存放至队列中,同时唤醒take任务的线程:
  22. */
  23. @Override
  24. public void offer(Runnable runnable) {
  25. synchronized (runnableLinkedList) {
  26. if ( runnableLinkedList.size() >= limit ) {
  27. // 无法容纳新的任务时执行拒绝策略
  28. denyPolicy.reject(runnable, threadPool);
  29. } else {
  30. // 将任务加入到队尾,并且唤醒阻塞中的线程
  31. runnableLinkedList.addLast(runnable);
  32. runnableLinkedList.notifyAll();
  33. }
  34. }
  35. }
  36. /*
  37. take方法也是同步方法,线程不断从队列中获取Runnable任务,当队列为空的时候工
  38. 作线程会陷入阻塞,有可能在阻塞的过程中被中断,为了传递中断信号需要在catch语句块
  39. 中将异常抛出以通知上游(Internal Task),示例代码如下:
  40. */
  41. @Override
  42. public Runnable take() throws InterruptedException{
  43. synchronized (runnableLinkedList) {
  44. while(runnableLinkedList.isEmpty()) {
  45. try {
  46. // 如果任务队列中没有可执行任务,则当前线程将会挂起,
  47. // 进入runnableList关联的monitor waitset中等待唤醒(新的任务加入)
  48. runnableLinkedList.wait();
  49. } catch (InterruptedException e) {
  50. // 被中断时需要将异常抛出
  51. throw e;
  52. }
  53. }
  54. return runnableLinkedList.removeFirst();
  55. }
  56. }
  57. // size方法用于返回runnable List的任务个数。
  58. @Override
  59. public int size() {
  60. synchronized (runnableLinkedList) {
  61. // 返回当前任务队列中的任务数
  62. return runnableLinkedList.size();
  63. }
  64. }
  65. }

2.初始化线程池

根据前面的讲解,线程池需要有数量控制属性、创建线程工厂、任务队列策略等功能,线程池初始化代码如所示

  1. import java.util.ArrayDeque;
  2. import java.util.Queue;
  3. import java.util.concurrent.TimeUnit;
  4. import java.util.concurrent.atomic.AtomicInteger;
  5. public class BasicThreadPool extends Thread implements ThreadPool {
  6. // 初始化线程数量
  7. private final int initSize;
  8. // 线程池最大线程数量
  9. private final int maxSize;
  10. // 线程池核心线程数量
  11. private final int coreSize;
  12. // 当前活跃的线程数量
  13. private int activeCount;
  14. // 创建线程所需的工厂
  15. private final ThreadFactory threadFactory;
  16. // 任务队列
  17. private final RunnableQueue runnableQueue;
  18. // 线程池是否已经被shutdown
  19. private volatile boolean isShutdown = false;
  20. // 工作线程队列
  21. private final Queue<ThreadTask> threadQueue = new ArrayDeque<>();
  22. private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();
  23. private final static ThreadFactory DEFAULT_THREAD_FACTORY=new DefaultThreadFactory();
  24. private final long keepAliveTime;
  25. private final TimeUnit timeUnit;
  26. // 构造时需要传递的参数:初始化的线程数量,最大的线程数量,核心线程数量,任务队列的最大数量
  27. public BasicThreadPool(int initSize, int maxSize, int coreSize, int queueSize) {
  28. this(initSize, maxSize, coreSize, DEFAULT_THREAD_FACTORY, queueSize, DEFAULT_DENY_POLICY,10,TimeUnit.SECONDS);
  29. }
  30. public BasicThreadPool(int initSize, int maxSize, int coreSize, ThreadFactory threadFactory,
  31. int queueSize, DenyPolicy denyPolicy, long keepAliveTime,
  32. TimeUnit timeUnit) {
  33. this.initSize = initSize;
  34. this.maxSize = maxSize;
  35. this.coreSize = coreSize;
  36. this.threadFactory = threadFactory;
  37. this.runnableQueue = new LinkedRunnableQueue(queueSize, denyPolicy, this);
  38. this.keepAliveTime = keepAliveTime;
  39. this.timeUnit = timeUnit;
  40. this.init();
  41. }
  42. // 初始化时,先创建initSize个线程
  43. private void init() {
  44. start();
  45. for(int i = 0; i < initSize; i++) {
  46. newThread();
  47. }
  48. }
  49. /*
  50. 一个线程池除了控制参数之外,最主要的是应该有活动线程,其中Queue<Thread-Task>主要用来存放活动线程,
  51. BasicThreadPool同时也是Thread的子类,它在初始化的时候启动, 在keepalive时间间隔到了之后再自动维护
  52. 活动线程数量(采用继承Thread的方式其实不是一种好的方法,因为Basic ThreadPool会暴露Thread的方法,
  53. 建议将继承关系更改为组合关系,读者可以自行修改)。
  54. */
  55. /*
  56. 提交任务非常简单,只是将Runnable插入runnableQueue中即可。
  57. */
  58. @Override
  59. public void execute(Runnable runnable) {
  60. if( this.isShutdown ) {
  61. throw new IllegalStateException("The thread pool is destory");
  62. }
  63. // 提交任务只是简单地往任务队列中插入Runnable
  64. this.runnableQueue.offer(runnable);
  65. }
  66. // 4.线程池自动维护
  67. /*
  68. 线程池中线程数量的维护主要由run负责, 这也是为什么BasicThreadPool继承自Thread了,
  69. 不过笔者不推荐使用直接继承的方式, 线程池自动维护代码如下:
  70. */
  71. private void newThread() {
  72. // 创建任务线程,并且启动
  73. InternalTask internalTask = new InternalTask(runnableQueue);
  74. Thread thread = this.threadFactory.createThread(internalTask);
  75. ThreadTask threadTask = new ThreadTask(thread, internalTask);
  76. threadQueue.offer(threadTask);
  77. this.activeCount++;
  78. this.start();
  79. }
  80. private void removeThread(){
  81. //从线程池中移除某个线程
  82. ThreadTask threadTask = threadQueue.remove();
  83. threadTask.internalTask.stop();
  84. this.activeCount--;
  85. }
  86. @Override
  87. public void run() {
  88. // run方法继承自Thread主要用于维护线程数量,比如扩容,回收等工作
  89. while(!isShutdown && !isInterrupted() ) {
  90. try {
  91. timeUnit.sleep(keepAliveTime);
  92. } catch (InterruptedException e) {
  93. isShutdown = true;
  94. break;
  95. }
  96. synchronized (this) {
  97. if ( isShutdown) break;
  98. // 当前的队列中有任务尚未处理,并且activeCount<coreSize 则继续扩容
  99. if ( runnableQueue.size() > 0 && activeCount < coreSize ) {
  100. for(int i = initSize; i < coreSize; i++) {
  101. newThread();
  102. }
  103. // continue的目的在于不想让线程的扩容直接达到maxSize
  104. continue;
  105. }
  106. // 当前的队列中有任务尚未处理,并且activeCount < maxSize则继续扩容
  107. if ( runnableQueue.size() > 0 && activeCount < maxSize ) {
  108. for( int i = coreSize; i < maxSize; i++) {
  109. newThread();
  110. }
  111. }
  112. // 如果任务队列中没有任务,则需要回收,回收至coreSize即可
  113. if ( runnableQueue.size() == 0 && activeCount > coreSize) {
  114. for( int i = coreSize; i < activeCount; i++) {
  115. removeThread();
  116. }
  117. }
  118. }
  119. }
  120. }
  121. /*
  122. 5. 线程池销毁
  123. 线程池的销毁同样需要同步机制的保护,
  124. 主要是为了防止与线程池本身的维护线程引起数据冲突,线程池销毁代码如下:
  125. */
  126. /*
  127. 销毁线程池主要为了是停止BasicThreadPool线程, 停止线程池中的活动线程并且将
  128. isShutdown开关变量更改为true
  129. */
  130. /*
  131. 1.下面重点来解说线程自动维护的方法,自动维护线程的代码块是同步代码块,主要是为了阻止在线程维护过程中线程池销毁引起的数据不一致问题。
  132. 2.任务队列中若存在积压任务, 并且当前活动线程少于核心线程数, 则新建coreSize initSize数量的线程,并且将其加入到活动线程队列中
  133. 为了防止马上进行coreSize数量的扩充,建议使用continue终止本次循环。
  134. 3.任务队列中有积压任务,并且当前活动线程少于最大线程数,则新建maxSize-coreSize数量的线程,并且将其加入到活动队列中。
  135. 4.当前线程池不够繁忙时,则需要回收部分线程,回收到coreSize数量即可,回收时调用removeThread()方法,
  136. 在该方法中需要考虑的一点是,如果被回收的线程恰巧从Runnable任务取出了某个任务,
  137. 则会继续保持该线程的运行, 直到完成了任务的运行为止,详见Internal Task的run方法。
  138. */
  139. @Override
  140. public void shutdown() {
  141. synchronized (this) {
  142. if ( isShutdown ) return;
  143. isShutdown = true;
  144. threadQueue.forEach(threadTask -> {
  145. threadTask.internalTask.stop();
  146. threadTask.thread.interrupt();
  147. });
  148. this.interrupt();
  149. }
  150. }
  151. @Override
  152. public int getInitSize() {
  153. if( isShutdown)
  154. throw new IllegalStateException("The thread pool is destory");
  155. return this.initSize;
  156. }
  157. @Override
  158. public int getMaxSize() {
  159. if ( isShutdown )
  160. throw new IllegalStateException("The thread pool is destory");
  161. return this.maxSize;
  162. }
  163. @Override
  164. public int getCoreSize() {
  165. if ( isShutdown )
  166. throw new IllegalStateException("The thread pool is destory");
  167. return this.coreSize;
  168. }
  169. @Override
  170. public int getQueueSize() {
  171. if ( isShutdown )
  172. throw new IllegalStateException("The thread pool is destory");
  173. return runnableQueue.size();
  174. }
  175. @Override
  176. public int getActiveCount() {
  177. synchronized (this) {
  178. return this.activeCount;
  179. }
  180. }
  181. @Override
  182. public boolean isShutdown() {
  183. return this.isShutdown;
  184. }
  185. private static class DefaultThreadFactory implements ThreadFactory {
  186. private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(1);
  187. private static final ThreadGroup group = new ThreadGroup("MyThreadPool-"
  188. + GROUP_COUNTER.getAndDecrement());
  189. private static final AtomicInteger COUNTER = new AtomicInteger(0);
  190. @Override
  191. public Thread createThread(Runnable runnable) {
  192. return new Thread(group, runnable, "thread-pool-" + COUNTER.getAndDecrement());
  193. }
  194. }
  195. private static class ThreadTask {
  196. Thread thread;
  197. InternalTask internalTask;
  198. public ThreadTask(Thread thread, InternalTask internalTask) {
  199. this.thread = thread;
  200. this.internalTask = internalTask;
  201. }
  202. }
  203. }

3.线程池应用

本节将写一个简单的程序分别测试线程池的任务提交、线程池线程数量的动态扩展,以及线程池的销毁功能,代码如所示。

  1. import java.util.concurrent.TimeUnit;
  2. public class ThreadPoolTest {
  3. public static void main(String[] args) throws InterruptedException {
  4. // 定义线程池,初始化线程数量2, 核心线程数为4, 最大线程数为6, 任务队列最多允许1000个任务
  5. final ThreadPool threadPool = new BasicThreadPool(2,6,4,1000);
  6. // 定义20个任务并且提交给线程池
  7. for(int i = 0; i < 20; i++ ) {
  8. threadPool.execute(
  9. ()-> {
  10. try {
  11. TimeUnit.SECONDS.sleep(10);
  12. System.out.println(Thread.currentThread().getName() + " is running and done.");
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. );
  18. }
  19. // 用于获取一些线程池信息
  20. // for(;;) {
  21. // System.out.println("getActiveCount:" + threadPool.getActiveCount());
  22. // System.out.println("getQueueSize:" + threadPool.getQueueSize());
  23. // System.out.println("getCoreSize:" + threadPool.getCoreSize());
  24. // System.out.println("getMaxSize:" + threadPool.getMaxSize());
  25. // TimeUnit.SECONDS.sleep(5);
  26. // }
  27. Thread.currentThread().join();
  28. }
  29. }

程序运行12秒之后,线程池将被销毁,线程池中的线程都将被销毁,同样为了验证所有线程是否被成功销毁, 也可以借助JVM工具查看堆栈信息, 这也是在最后使用current Thread join进行阻塞以便于查看的原因。