6. ThreadGroup详解

6.1 ThreadGroup和Thread

新的线程都会被加入到main线程所在的group中, main线程的group名字同线程名。ThreadGroup同样存在父子关系. 如下:
Java高并发编程详解(一) - 图1

6.2 创建ThreadGroup

  1. package com.zoro.concurrent.chapter06;
  2. /**
  3. * 创建ThreadGroup
  4. *
  5. * @author yx.jiang
  6. * @date 2021/7/27 10:26
  7. */
  8. public class ThreadGroupCreator {
  9. public static void main(String[] args) {
  10. ThreadGroup currentGroup = Thread.currentThread().getThreadGroup();
  11. //currentGroup = java.lang.ThreadGroup[name=main,maxpri=10]
  12. System.out.println("currentGroup = " + currentGroup);
  13. ThreadGroup group1 = new ThreadGroup("Group1");
  14. //输出true
  15. System.out.println(group1.getParent() == currentGroup);
  16. ThreadGroup group2 = new ThreadGroup(group1, "Group2");
  17. //输出true
  18. System.out.println(group2.getParent() == group1);
  19. }
  20. }

6.3 复制Thread数组和ThreadGroup数组

6.3.1 复制Thread数组

前两个方法会将ThreadGroup中的active线程全部复制到Thread数组中
recurse = true; 递归复制——将所有子group中的active线程都递归到Thread数组中
recurse = false; 非递归复制
image.png

  • enumerate获取的仅仅是个预估值, 并不能100%地保证当前group的活跃线程.如: 在调用复制之后, 某个线程结束了生命周期/新的线程加入了进来, 均会导致数据的不准确
  • enumerate方法的返回值int较Thread[]的长度更为真实, 如: 定义了数组长度的Thread数组, 那么enumerate仅仅会将当前活跃的thread分别放进数组中, 而int代表真是的数量, 而并非数组的长度;

    6.3.2 复制ThreadGroup数组

    image.png

    6.4 ThreadGroup操作

    ThreadGroup不能提供对线程的管理, 主要功能是对线程 进行组织.

    6.4.1 ThreadGroup基操

    image.png
    image.png
    略.

    6.4.2 ThreadGroup的interrupt

    interrupt一个ThreadGroup会导致该group中所有的active线程都被interrupt, 即: 该group中每个线程的interrupt标识都被设置了

    6.4.3 ThreadGroup的destroy

    destroy用于销毁ThreadGroup, 只针对一个没有任何active线程的group进行一次destroy标记, 调用该方法的直接结果是在父group中将自己移除

    6.4.4 守护ThreadGroup

    像Thread一样, 但是将ThreadGroup设置为daemon, 并不会影响线程的daemon属性; 如果一个ThreadGroup的daemon被设置为true, 那么在group中没有任何active线程的时候该group将自动destroy

    7. Hook线程以及捕获线程异常执行

    7.1 获取线程运行时异常

  • _public static void _setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh)

  • _public static _Thread.UncaughtExceptionHandler getDefaultUncaughtExceptionHandler()
  • _public _Thread.UncaughtExceptionHandler getUncaughtExceptionHandler()
  • _public void _setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh)

image.png
image.png

7.1.1 UncaughtExceptionHandler介绍

线程在执行单元中是不允许抛出checked异常的, 且线程运行在自己的上下文中, 派生它的线程将无法直接获得它运行中出现的异常信息; UncaughtExceptionHandler 接口: 当线程在运行过程中出现异常时, 会回调UncaughtExceptionHandler 接口, 从而得知是哪个线程在运行时出错, 以及出现了什么样的错误

java.lang.Thread#dispatchUncaughtException()调用了java.lang.Thread.UncaughtExceptionHandler#uncaughtException()
实际运行过程中出现异常, JVM会调用dispatchUncaughtException()方法, 该方法会将对应的线程实例以及异常信息传递给回调接口.

  1. @FunctionalInterface
  2. public interface UncaughtExceptionHandler {
  3. /**
  4. * Method invoked when the given thread terminates due to the
  5. * given uncaught exception.
  6. * <p>Any exception thrown by this method will be ignored by the
  7. * Java Virtual Machine.
  8. * @param t the thread
  9. * @param e the exception
  10. */
  11. void uncaughtException(Thread t, Throwable e);
  12. }
  13. /**
  14. * Dispatch an uncaught exception to the handler. This method is
  15. * intended to be called only by the JVM.
  16. */
  17. private void dispatchUncaughtException(Throwable e) {
  18. getUncaughtExceptionHandler().uncaughtException(this, e);
  19. }

7.1.2 实例

  1. package com.zoro.concurrent.chapter07;
  2. import java.util.concurrent.TimeUnit;
  3. /**
  4. * 捕获线程异常
  5. *
  6. * @author yx.jiang
  7. * @date 2021/7/27 10:26
  8. */
  9. public class CaptureThreadException {
  10. /**
  11. * 输出如下:
  12. * Test-Threadoccur exception:
  13. * java.lang.ArithmeticException: / by zero
  14. * at com.zoro.concurrent.chapter07.CaptureThreadException.lambda$main$1(CaptureThreadException.java:27)
  15. * at java.lang.Thread.run(Thread.java:748)
  16. */
  17. public static void main(String[] args) {
  18. //设置回调接口
  19. Thread.setDefaultUncaughtExceptionHandler((t, e) -> {
  20. System.out.println(t.getName() + "occur exception: ");
  21. e.printStackTrace();
  22. });
  23. final Thread thread = new Thread(() -> {
  24. try {
  25. TimeUnit.SECONDS.sleep(2);
  26. } catch (InterruptedException e) {
  27. //e.printStackTrace();
  28. }
  29. //unchecked异常
  30. System.out.println(1 / 0);
  31. }, "Test-Thread");
  32. thread.start();
  33. }
  34. }

7.1.3 分析sourceCode

java.lang.Thread#getUncaughtExceptionHandler

  1. public UncaughtExceptionHandler getUncaughtExceptionHandler() {
  2. return uncaughtExceptionHandler != null ?
  3. uncaughtExceptionHandler : group;
  4. }

java.lang.ThreadGroup#uncaughtException

  1. public void uncaughtException(Thread t, Throwable e) {
  2. if (parent != null) {
  3. parent.uncaughtException(t, e);
  4. } else {
  5. Thread.UncaughtExceptionHandler ueh =
  6. Thread.getDefaultUncaughtExceptionHandler();
  7. if (ueh != null) {
  8. ueh.uncaughtException(t, e);
  9. } else if (!(e instanceof ThreadDeath)) {
  10. System.err.print("Exception in thread \""
  11. + t.getName() + "\" ");
  12. e.printStackTrace(System.err);
  13. }
  14. }
  15. }
  • 该ThreadGroup如果有父ThreadGroup, 则直接调用父ThreadGroup的uncaughtException方法
  • 如果设置了全局默认的UncaughtExceptionHandler, 则调用uncaughtException方法
  • 若既没有父ThreadGroup, 又没有设置全局默认的UncaughtExceptionHandler, 则会直接将异常的堆栈信息定向到System.err中 ```java package com.zoro.concurrent.chapter07;

import java.util.concurrent.TimeUnit;

/**

  • @author yx.jiang
  • @date 2021/7/27 10:26 */ public class EmptyExceptionHandler {
  1. /**
  2. * 输出如下:
  3. * mainGroup.getName() = main
  4. * mainGroup.getParent() = java.lang.ThreadGroup[name=system,maxpri=10]
  5. * mainGroup.getParent().getParent() = null
  6. * Exception in thread "Test-Thread" java.lang.ArithmeticException: / by zero
  7. * at com.zoro.concurrent.chapter07.EmptyExceptionHandler.lambda$main$0(EmptyExceptionHandler.java:29)
  8. * at java.lang.Thread.run(Thread.java:748)
  9. */
  10. public static void main(String[] args) {
  11. //设置回调接口
  12. ThreadGroup mainGroup = Thread.currentThread().getThreadGroup();
  13. System.out.println("mainGroup.getName() = " + mainGroup.getName());
  14. System.out.println("mainGroup.getParent() = " + mainGroup.getParent());
  15. System.out.println("mainGroup.getParent().getParent() = " + mainGroup.getParent().getParent());
  16. final Thread thread = new Thread(() -> {
  17. try {
  18. TimeUnit.SECONDS.sleep(2);
  19. } catch (InterruptedException e) {
  20. //e.printStackTrace();
  21. }
  22. //unchecked异常
  23. System.out.println(1 / 0);
  24. }, "Test-Thread");
  25. thread.start();
  26. }

}

  1. ![](https://cdn.nlark.com/yuque/0/2021/jpeg/708204/1627541129891-79306ddd-43b2-4353-8ce9-efc5828d0948.jpeg)
  2. <a name="ac0bT"></a>
  3. ### 7.2 注入钩子线程
  4. <a name="t3szI"></a>
  5. #### 7.2.1 Hook线程介绍
  6. ```java
  7. package com.zoro.concurrent.chapter07;
  8. import java.util.concurrent.TimeUnit;
  9. /**
  10. * 钩子线程
  11. *
  12. * @author yx.jiang
  13. * @date 2021/7/27 10:26
  14. */
  15. public class ThreadHook {
  16. /**
  17. * 输出如下:
  18. * 程序即将关闭...
  19. * 钩子线程1启动...
  20. * 钩子线程2启动...
  21. * 钩子线程1即将关闭
  22. * 钩子线程2即将关闭
  23. */
  24. public static void main(String[] args) {
  25. //注册第一个钩子线程
  26. Runtime.getRuntime().addShutdownHook(new Thread(() -> {
  27. System.out.println("钩子线程1启动...");
  28. try {
  29. TimeUnit.SECONDS.sleep(2);
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. System.out.println("钩子线程1即将关闭");
  34. }));
  35. //注册第二个钩子线程
  36. Runtime.getRuntime().addShutdownHook(new Thread(() -> {
  37. System.out.println("钩子线程2启动...");
  38. try {
  39. TimeUnit.SECONDS.sleep(2);
  40. } catch (InterruptedException e) {
  41. e.printStackTrace();
  42. }
  43. System.out.println("钩子线程2即将关闭");
  44. }));
  45. System.out.println("程序即将关闭...");
  46. }
  47. }

7.2.2 Hook实战

  1. package com.zoro.concurrent.chapter07;
  2. import java.io.File;
  3. import java.io.IOException;
  4. import java.nio.file.Files;
  5. import java.nio.file.Path;
  6. import java.nio.file.Paths;
  7. import java.nio.file.attribute.PosixFilePermission;
  8. import java.nio.file.attribute.PosixFilePermissions;
  9. import java.util.Set;
  10. import java.util.concurrent.TimeUnit;
  11. /**
  12. * 防止重复
  13. * <h>防止某个程序重复启动, 在进程启动时创建一个.lock文件, 进程收到中断信号的时候会删除这个lock文件</h>
  14. *
  15. * @author yx.jiang
  16. * @date 2021/7/27 10:26
  17. */
  18. public class PreventDuplicated {
  19. private static final String LOCK_PATH = "D:\\idea tool box\\locks";
  20. private static final String LOCK_FILE = ".lock";
  21. private static final String PERMISSION = "rw-------";
  22. public static void main(String[] args) throws IOException {
  23. //注入Hook线程, 在程序退出时删除lock文件
  24. Runtime.getRuntime().addShutdownHook(new Thread(() -> {
  25. System.out.println("程序接收到 kill 信号.");
  26. getLockFile().toFile().delete();
  27. }));
  28. //检查是否存在.lock文件
  29. checkRunning();
  30. //简单模拟当前进程正在运行
  31. for (; ; ) {
  32. try {
  33. TimeUnit.SECONDS.sleep(1);
  34. System.out.println("程序正在运行...");
  35. } catch (InterruptedException e) {
  36. e.printStackTrace();
  37. }
  38. }
  39. }
  40. public static void checkRunning() throws IOException {
  41. Path path = getLockFile();
  42. if (path.toFile().exists()) {
  43. throw new RuntimeException("程序正在运行...");
  44. }
  45. //WINDOWS
  46. Path file1 = Files.createFile(path);
  47. File file = file1.toFile();
  48. file.setReadable(true);
  49. file.setWritable(true);
  50. System.out.println(file.exists() + " " + file.getAbsolutePath());
  51. // 'posix:permissions' not supported as initial attribute
  52. // UNIX
  53. // Set<PosixFilePermission> perms = PosixFilePermissions.fromString(PERMISSION);
  54. // Files.createFile(path, perms);
  55. }
  56. public static Path getLockFile() {
  57. return Paths.get(LOCK_PATH, LOCK_FILE);
  58. }
  59. }

image.png

7.2.3 注意事项

  • Hook线程只有在收到退出信号的时候会被执行, 如果在kill的时候使用了参数 **-9**, 那么Hook线程不会得到执行, 进程将会立即退出, 因此.lock文件将得不到清理.
  • Hook线程中也可以执行一些资源的释放工作, 比如关闭文件句柄, socket链接, 数据库connection等.
  • 尽量不要在Hook线程中执行一些耗时非常长的操作, 因为会导致程序迟迟不能退出

    8. 线程池原理及自定义线程池

    8.1 线程池原理

  • 任务队列: 用于缓存提交任务

  • 线程数量管理功能: 一个线程池必须能够很好的管理和控制线程数量, 可通过如下三个参数来实现, 比如创建线程池时初始的线程数量init; 线程池自动扩充时最大的核心线程数量max; 在线程池空闲时需要释放线程但是也要维护一定数量的活跃数量或者核心数量core. init <= core <= max;
  • 任务拒绝策略: 如果线程数量已经到上限且任务队列已满, 则需要有相应的拒绝策略来通知任务提交者;
  • 线程工厂: 主要用于个性化定制线程, 比如将线程设置为守护线程以及设置线程名称等;
  • QueueSize: 任务队列主要用于存放提交的Runnable, 但是为了防止内存溢出, 需要有limit数量对其进行控制
  • keepedAlive时间: 该时间主要决定线程各个重要参数自动维护的时间间隔.

    8.2 线程池实现

    image.png

    ThreadPool

    ```java package com.zoro.concurrent.chapter08.threadpool;

/**

  • 线程池接口 *
  • @author yx.jiang
  • @date 2021/7/27 10:26 / public interface ThreadPool { /*

    • 提交任务到线程池 *
    • @param runnable runnable */ void execute(Runnable runnable);

      /**

    • 关闭并销毁线程池 */ void shutdown();

      /**

    • 获取线程池的初始化大小 *
    • @return */ int getInitSize();

      /**

    • 获取线程池最大的线程数 *
    • @return */ int getMaxSize();

      /**

    • 获取线程池的核心线程数量 *
    • @return */ int getCoreSize();

      /**

    • 获取线程池中用于缓存任务队列的大小 *
    • @return */ int getQueueSize();

      /**

    • 获取线程池中活跃线程的数量 *
    • @return */ int getActiveCount();

      /**

    • 查看线程池是否已经被销毁(shutdown) *
    • @return */ boolean isShutdown(); }
  1. <a name="KWWWf"></a>
  2. #### ThreadFactory
  3. ```java
  4. package com.zoro.concurrent.chapter08.threadpool;
  5. /**
  6. * 创建Thread的工厂
  7. *
  8. * @author yx.jiang
  9. * @date 2021/7/27 10:26
  10. */
  11. @FunctionalInterface
  12. public interface ThreadFactory {
  13. /**
  14. * 创建线程
  15. *
  16. * @param runnable 执行单元/任务
  17. * @return 线程
  18. */
  19. Thread createThread(Runnable runnable);
  20. }

RunnableQueue

  1. package com.zoro.concurrent.chapter08.threadpool;
  2. /**
  3. * 任务队列
  4. *
  5. * @author yx.jiang
  6. * @date 2021/7/27 10:26
  7. */
  8. public interface RunnableQueue {
  9. /**
  10. * 将任务提交至队列中
  11. *
  12. * @param runnable
  13. */
  14. void offer(Runnable runnable);
  15. /**
  16. * 工作线程通过take获取Runnable
  17. */
  18. Runnable take() throws InterruptedException;
  19. /**
  20. * 获取任务队列中任务的数量
  21. *
  22. * @return
  23. */
  24. int size();
  25. }

DenyPolicy

  1. package com.zoro.concurrent.chapter08.threadpool;
  2. /**
  3. * 拒绝策略
  4. *
  5. * @author yx.jiang
  6. * @date 2021/7/27 10:26
  7. */
  8. @FunctionalInterface
  9. public interface DenyPolicy {
  10. /**
  11. * 拒绝方法
  12. *
  13. * @param runnable 任务/执行单元
  14. * @param threadPool 线程池
  15. */
  16. void reject(Runnable runnable, ThreadPool threadPool);
  17. /**
  18. * 直接丢弃Runnable任务
  19. */
  20. class DiscardDenyPolicy implements DenyPolicy {
  21. @Override
  22. public void reject(Runnable runnable, ThreadPool threadPool) {
  23. //do nothing
  24. }
  25. }
  26. /**
  27. * 抛出{@link RunnableDenyException}异常
  28. */
  29. class AbortDenyPolicy implements DenyPolicy {
  30. @Override
  31. public void reject(Runnable runnable, ThreadPool threadPool) {
  32. throw new RunnableDenyException("执行单元/任务: " + runnable + "将被放弃");
  33. }
  34. }
  35. /**
  36. * 交由调用者的线程直接运行Runnable, 而不会被加入到线程池中
  37. */
  38. class RunnerDenyPolicy implements DenyPolicy {
  39. @Override
  40. public void reject(Runnable runnable, ThreadPool threadPool) {
  41. //do nothing
  42. }
  43. }
  44. }

RunnableDenyException

  1. package com.zoro.concurrent.chapter08.threadpool;
  2. /**
  3. * 可运行拒绝异常
  4. *
  5. * @author yx.jiang
  6. * @date 2021/7/27 10:26
  7. */
  8. public class RunnableDenyException extends RuntimeException {
  9. public RunnableDenyException(String message) {
  10. super(message);
  11. }
  12. }

LinkedRunnableQueue

  1. package com.zoro.concurrent.chapter08.threadpool;
  2. import java.util.LinkedList;
  3. /**
  4. * 任务队列实现
  5. *
  6. * @author yx.jiang
  7. * @date 2021/7/27 10:26
  8. */
  9. public class LinkedRunnableQueue implements RunnableQueue {
  10. /**
  11. * 任务队列最大容量
  12. */
  13. private final int limit;
  14. /**
  15. * 任务队列满后,需执行拒绝策略
  16. */
  17. private final DenyPolicy denyPolicy;
  18. private final LinkedList<Runnable> runnableList = new LinkedList<>();
  19. private final ThreadPool threadPool;
  20. public LinkedRunnableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool) {
  21. this.limit = limit;
  22. this.denyPolicy = denyPolicy;
  23. this.threadPool = threadPool;
  24. }
  25. @Override
  26. public void offer(Runnable runnable) {
  27. synchronized (runnableList) {
  28. if (runnableList.size() >= limit) {
  29. //无法容纳新的任务时执行拒绝策略
  30. denyPolicy.reject(runnable, threadPool);
  31. } else {
  32. //将任务加入到队尾, 并唤醒阻塞中的线程
  33. runnableList.addLast(runnable);
  34. runnableList.notifyAll();
  35. }
  36. }
  37. }
  38. @Override
  39. public Runnable take() throws InterruptedException {
  40. synchronized (runnableList) {
  41. while (runnableList.isEmpty()) {
  42. try {
  43. //如果任务队列中没有可执行的任务,则当前线程会挂起,进入runnableList关联的monitor waitSet 中等待唤醒(新的任务加入)
  44. runnableList.wait();
  45. } catch (InterruptedException e) {
  46. throw e;
  47. }
  48. }
  49. //从任务队列头部移除一个任务
  50. return runnableList.removeFirst();
  51. }
  52. }
  53. @Override
  54. public int size() {
  55. synchronized (runnableList) {
  56. //返回当前任务队列中的任务数
  57. return runnableList.size();
  58. }
  59. }
  60. }

InternalTask

  1. package com.zoro.concurrent.chapter08.threadpool;
  2. /**
  3. * 内部任务(线程池内)
  4. *
  5. * @author yx.jiang
  6. * @date 2021/7/27 10:26
  7. */
  8. public class InternalTask implements Runnable {
  9. private final RunnableQueue runnableQueue;
  10. private volatile boolean running = true;
  11. public InternalTask(RunnableQueue runnableQueue) {
  12. this.runnableQueue = runnableQueue;
  13. }
  14. @Override
  15. public void run() {
  16. //如果当任务为running并且没有被中断, 则其将不断的从queue中获取runnable, 然后执行run方法
  17. while (running && !Thread.currentThread().isInterrupted()) {
  18. try {
  19. Runnable task = runnableQueue.take();
  20. task.run();
  21. } catch (InterruptedException e) {
  22. running = false;
  23. break;
  24. }
  25. }
  26. }
  27. /**
  28. * 停止当前任务, 在线程池的shutdown方法中使用
  29. */
  30. public void stop() {
  31. this.running = false;
  32. }
  33. }

BasicThreadPool

  1. package com.zoro.concurrent.chapter08.threadpool;
  2. import java.util.ArrayDeque;
  3. import java.util.Queue;
  4. import java.util.concurrent.TimeUnit;
  5. import java.util.concurrent.atomic.AtomicInteger;
  6. /**
  7. * 初始化线程池
  8. * FIXME: 这里不建议 extends Thread; 用组合的方式更好些;
  9. *
  10. * @author yx.jiang
  11. * @date 2021/7/27 10:26
  12. */
  13. public class BasicThreadPool extends Thread implements ThreadPool {
  14. public static final String THREAD_POOL_DESTROYED = "线程池已销毁";
  15. /**
  16. * 初始化线程数量
  17. */
  18. private final int initSize;
  19. /**
  20. * 最大线程数量
  21. */
  22. private final int maxSize;
  23. /**
  24. * 核心线程数量
  25. */
  26. private final int coreSize;
  27. /**
  28. * 当前活跃线程数量
  29. */
  30. private int activeCount;
  31. /**
  32. * 创建线程所需工厂
  33. */
  34. private final ThreadFactory threadFactory;
  35. /**
  36. * 任务队列
  37. */
  38. private final RunnableQueue runnableQueue;
  39. /**
  40. * 线程池是否销毁
  41. */
  42. private volatile boolean isShutdown = false;
  43. /**
  44. * 工作队列
  45. */
  46. private final Queue<ThreadTask> threadQueue = new ArrayDeque<>();
  47. /**
  48. * 默认拒绝策略
  49. */
  50. private static final DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();
  51. /**
  52. * 默认线程工厂
  53. */
  54. private static final ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();
  55. private final long keepAliveTime;
  56. private final TimeUnit timeUnit;
  57. public BasicThreadPool(int initSize, int maxSize, int coreSize, ThreadFactory threadFactory, int queueSize, DenyPolicy denyPolicy, long keepAliveTime, TimeUnit timeUnit) {
  58. this.initSize = initSize;
  59. this.maxSize = maxSize;
  60. this.coreSize = coreSize;
  61. this.threadFactory = threadFactory;
  62. this.runnableQueue = new LinkedRunnableQueue(queueSize, denyPolicy, this);
  63. this.keepAliveTime = keepAliveTime;
  64. this.timeUnit = timeUnit;
  65. this.init();
  66. }
  67. public BasicThreadPool(int initSize, int maxSize, int coreSize, int queueSize) {
  68. this(initSize, maxSize, coreSize, DEFAULT_THREAD_FACTORY, queueSize, DEFAULT_DENY_POLICY, 10, TimeUnit.SECONDS);
  69. }
  70. /**
  71. * 初始化时 —— 创建initSize个线程
  72. */
  73. private void init() {
  74. start();
  75. for (int i = 0; i < initSize; i++) {
  76. newThread();
  77. }
  78. }
  79. private void newThread() {
  80. //创建线程并启动
  81. InternalTask internalTask = new InternalTask(runnableQueue);
  82. Thread thread = threadFactory.createThread(internalTask);
  83. ThreadTask threadTask = new ThreadTask(thread, internalTask);
  84. threadQueue.add(threadTask);
  85. this.activeCount++;
  86. thread.start();
  87. }
  88. private void removeThread() {
  89. //从线程池中移除某个线程
  90. ThreadTask threadTask = threadQueue.remove();
  91. threadTask.internalTask.stop();
  92. this.activeCount--;
  93. }
  94. /**
  95. * 同步代码块:为了阻止在线程维护过程中线程池销毁引起的数据不一致问题
  96. */
  97. @Override
  98. public void run() {
  99. //该方法继承自Thread,主要用于维护线程数量,如:扩容、回收工作
  100. while (!isShutdown && !isInterrupted()) {
  101. try {
  102. timeUnit.sleep(keepAliveTime);
  103. } catch (InterruptedException e) {
  104. isShutdown = true;
  105. break;
  106. }
  107. synchronized (this) {
  108. if (isShutdown) {
  109. break;
  110. }
  111. //当前的队列中有任务尚未处理,且activeCount < coreSize,继续扩容
  112. if (runnableQueue.size() > 0 && activeCount < coreSize) {
  113. for (int i = initSize; i < coreSize; i++) {
  114. newThread();
  115. }
  116. //目的:不想让线程的扩容直接到达maxSize
  117. continue;
  118. }
  119. //当前的对列中有任务尚未处理,且activeCount < maxSize,继续扩容
  120. if (runnableQueue.size() > 0 && activeCount < maxSize) {
  121. for (int i = coreSize; i < maxSize; i++) {
  122. newThread();
  123. }
  124. }
  125. //若任务队列无任务,需要回收,回收至coreSize即可
  126. if (runnableQueue.size() == 0 && activeCount > coreSize) {
  127. for (int i = coreSize; i < activeCount; i++) {
  128. removeThread();
  129. }
  130. }
  131. }
  132. }
  133. }
  134. @Override
  135. public void execute(Runnable runnable) {
  136. if (this.isShutdown) {
  137. throw new IllegalStateException(THREAD_POOL_DESTROYED);
  138. }
  139. //向任务队列插入Runnable
  140. this.runnableQueue.offer(runnable);
  141. }
  142. /**
  143. * 同步机制保护:防止与线程池本身的维护线程引起数据冲突
  144. * 停止BasicThreadPool线程,停止线程池中的活动线程,将isShutdown开关改为true
  145. */
  146. @Override
  147. public void shutdown() {
  148. synchronized (this) {
  149. if (isShutdown) {
  150. return;
  151. }
  152. isShutdown = true;
  153. threadQueue.forEach(threadTask -> {
  154. threadTask.internalTask.stop();
  155. threadTask.thread.interrupt();
  156. });
  157. this.interrupt();
  158. }
  159. }
  160. @Override
  161. public int getInitSize() {
  162. if (isShutdown) {
  163. throw new IllegalStateException("线程池已销毁");
  164. }
  165. return this.initSize;
  166. }
  167. @Override
  168. public int getMaxSize() {
  169. if (isShutdown) {
  170. throw new IllegalStateException("线程池已销毁");
  171. }
  172. return this.maxSize;
  173. }
  174. @Override
  175. public int getCoreSize() {
  176. if (isShutdown) {
  177. throw new IllegalStateException("线程池已销毁");
  178. }
  179. return this.coreSize;
  180. }
  181. @Override
  182. public int getQueueSize() {
  183. if (isShutdown) {
  184. throw new IllegalStateException("线程池已销毁");
  185. }
  186. return runnableQueue.size();
  187. }
  188. @Override
  189. public int getActiveCount() {
  190. synchronized (this) {
  191. return this.activeCount;
  192. }
  193. }
  194. @Override
  195. public boolean isShutdown() {
  196. return this.isShutdown;
  197. }
  198. /**
  199. * {@link Thread} 和 {@link InternalTask}的一个组合
  200. */
  201. private static class ThreadTask {
  202. Thread thread;
  203. InternalTask internalTask;
  204. public ThreadTask(Thread thread, InternalTask internalTask) {
  205. this.thread = thread;
  206. this.internalTask = internalTask;
  207. }
  208. }
  209. /**
  210. * 默认线程工厂
  211. */
  212. private static class DefaultThreadFactory implements ThreadFactory {
  213. private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(1);
  214. private static final ThreadGroup group = new ThreadGroup("MyThreadPool-" + GROUP_COUNTER.getAndIncrement());
  215. private static final AtomicInteger COUNTER = new AtomicInteger(0);
  216. @Override
  217. public Thread createThread(Runnable runnable) {
  218. return new Thread(group, runnable, "thread-pool-" + COUNTER.getAndIncrement());
  219. }
  220. }
  221. }

ThreadPoolTest

  1. package com.zoro.concurrent.chapter08;
  2. import com.zoro.concurrent.chapter08.threadpool.BasicThreadPool;
  3. import java.util.concurrent.TimeUnit;
  4. /**
  5. * 线程池测试类
  6. *
  7. * @author yx.jiang
  8. * @date 2021/7/27 10:26
  9. */
  10. public class ThreadPoolTest {
  11. public static void main(String[] args) throws InterruptedException {
  12. BasicThreadPool basicThreadPool = new BasicThreadPool(2, 6, 4, 1000);
  13. for (int i = 0; i < 20; i++) {
  14. basicThreadPool.execute(() -> {
  15. try {
  16. TimeUnit.SECONDS.sleep(10);
  17. System.out.println(Thread.currentThread().getName() + " 已启动");
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. });
  22. }
  23. //验证线程池中的动态扩展情况及任务执行情况;最终activeCount = coreSize
  24. /*for (; ;) {
  25. System.out.println("basicThreadPool.getActiveCount() = " + basicThreadPool.getActiveCount());
  26. System.out.println("basicThreadPool.getQueueSize() = " + basicThreadPool.getQueueSize());
  27. System.out.println("basicThreadPool.getCoreSize() = " + basicThreadPool.getCoreSize());
  28. System.out.println("basicThreadPool.getMaxSize() = " + basicThreadPool.getMaxSize());
  29. System.out.println("==========");
  30. TimeUnit.SECONDS.sleep(5);
  31. }*/
  32. TimeUnit.SECONDS.sleep(15);
  33. //12s后shutdown
  34. basicThreadPool.shutdown();
  35. TimeUnit.SECONDS.sleep(1);
  36. System.out.println("isShutdown:" + basicThreadPool.isShutdown());
  37. //java.lang.IllegalStateException: 线程池已销毁
  38. // System.out.println("QueueSize:" + basicThreadPool.getQueueSize());
  39. //阻塞线程 - jStack -定位原因
  40. Thread.currentThread().join();
  41. }
  42. }

Test-Out

  1. //线程池动态扩张情况及任务执行情况: 输出
  2. basicThreadPool.getActiveCount() = 2
  3. basicThreadPool.getQueueSize() = 18
  4. basicThreadPool.getCoreSize() = 4
  5. basicThreadPool.getMaxSize() = 6
  6. ==========
  7. basicThreadPool.getActiveCount() = 2
  8. basicThreadPool.getQueueSize() = 18
  9. basicThreadPool.getCoreSize() = 4
  10. basicThreadPool.getMaxSize() = 6
  11. ==========
  12. basicThreadPool.getActiveCount() = 4
  13. thread-pool-0已启动
  14. thread-pool-1已启动
  15. basicThreadPool.getQueueSize() = 16
  16. basicThreadPool.getCoreSize() = 4
  17. basicThreadPool.getMaxSize() = 6
  18. ==========
  19. basicThreadPool.getActiveCount() = 4
  20. basicThreadPool.getQueueSize() = 14
  21. basicThreadPool.getCoreSize() = 4
  22. basicThreadPool.getMaxSize() = 6
  23. ==========
  24. thread-pool-2已启动
  25. thread-pool-3已启动
  26. thread-pool-1已启动
  27. thread-pool-0已启动
  28. basicThreadPool.getActiveCount() = 6
  29. basicThreadPool.getQueueSize() = 8
  30. basicThreadPool.getCoreSize() = 4
  31. basicThreadPool.getMaxSize() = 6
  32. ==========
  33. basicThreadPool.getActiveCount() = 6
  34. basicThreadPool.getQueueSize() = 8
  35. basicThreadPool.getCoreSize() = 4
  36. basicThreadPool.getMaxSize() = 6
  37. ==========
  38. thread-pool-2已启动
  39. thread-pool-5已启动
  40. thread-pool-3已启动
  41. thread-pool-4已启动
  42. thread-pool-0已启动
  43. thread-pool-1已启动
  44. basicThreadPool.getActiveCount() = 6
  45. basicThreadPool.getQueueSize() = 2
  46. basicThreadPool.getCoreSize() = 4
  47. basicThreadPool.getMaxSize() = 6
  48. ==========
  49. basicThreadPool.getActiveCount() = 6
  50. basicThreadPool.getQueueSize() = 2
  51. basicThreadPool.getCoreSize() = 4
  52. basicThreadPool.getMaxSize() = 6
  53. ==========
  54. thread-pool-4已启动
  55. thread-pool-3已启动
  56. thread-pool-2已启动
  57. thread-pool-5已启动
  58. thread-pool-1已启动
  59. thread-pool-0已启动
  60. basicThreadPool.getActiveCount() = 6
  61. basicThreadPool.getQueueSize() = 0
  62. basicThreadPool.getCoreSize() = 4
  63. basicThreadPool.getMaxSize() = 6
  64. ==========
  65. basicThreadPool.getActiveCount() = 6
  66. basicThreadPool.getQueueSize() = 0
  67. basicThreadPool.getCoreSize() = 4
  68. basicThreadPool.getMaxSize() = 6
  69. ==========
  70. thread-pool-4已启动
  71. thread-pool-3已启动
  72. basicThreadPool.getActiveCount() = 5
  73. basicThreadPool.getQueueSize() = 0
  74. basicThreadPool.getCoreSize() = 4
  75. basicThreadPool.getMaxSize() = 6
  76. ==========
  77. basicThreadPool.getActiveCount() = 5
  78. basicThreadPool.getQueueSize() = 0
  79. basicThreadPool.getCoreSize() = 4
  80. basicThreadPool.getMaxSize() = 6
  81. ==========
  82. basicThreadPool.getActiveCount() = 4
  83. basicThreadPool.getQueueSize() = 0
  84. basicThreadPool.getCoreSize() = 4
  85. basicThreadPool.getMaxSize() = 6
  86. ==========
  87. basicThreadPool.getActiveCount() = 4
  88. basicThreadPool.getQueueSize() = 0
  89. basicThreadPool.getCoreSize() = 4
  90. basicThreadPool.getMaxSize() = 6
  91. ==========
  92. //线程池销毁 - 输出
  93. thread-pool-0 已启动
  94. thread-pool-1 已启动
  95. java.lang.InterruptedException: sleep interrupted
  96. at java.lang.Thread.sleep(Native Method)
  97. at java.lang.Thread.sleep(Thread.java:340)
  98. at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
  99. at com.zoro.concurrent.chapter08.ThreadPoolTest.lambda$main$0(ThreadPoolTest.java:19)
  100. at com.zoro.concurrent.chapter08.threadpool.InternalTask.run(InternalTask.java:25)
  101. at java.lang.Thread.run(Thread.java:748)
  102. java.lang.InterruptedException: sleep interrupted
  103. at java.lang.Thread.sleep(Native Method)
  104. at java.lang.Thread.sleep(Thread.java:340)
  105. at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
  106. at com.zoro.concurrent.chapter08.ThreadPoolTest.lambda$main$0(ThreadPoolTest.java:19)
  107. at com.zoro.concurrent.chapter08.threadpool.InternalTask.run(InternalTask.java:25)
  108. at java.lang.Thread.run(Thread.java:748)
  109. java.lang.InterruptedException: sleep interrupted
  110. at java.lang.Thread.sleep(Native Method)
  111. at java.lang.Thread.sleep(Thread.java:340)
  112. at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
  113. at com.zoro.concurrent.chapter08.ThreadPoolTest.lambda$main$0(ThreadPoolTest.java:19)
  114. at com.zoro.concurrent.chapter08.threadpool.InternalTask.run(InternalTask.java:25)
  115. at java.lang.Thread.run(Thread.java:748)
  116. java.lang.InterruptedException: sleep interrupted
  117. at java.lang.Thread.sleep(Native Method)
  118. at java.lang.Thread.sleep(Thread.java:340)
  119. at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
  120. at com.zoro.concurrent.chapter08.ThreadPoolTest.lambda$main$0(ThreadPoolTest.java:19)
  121. at com.zoro.concurrent.chapter08.threadpool.InternalTask.run(InternalTask.java:25)
  122. at java.lang.Thread.run(Thread.java:748)
  123. isShutdown:true
  124. Exception in thread "main" java.lang.IllegalStateException: 线程池已销毁
  125. at com.zoro.concurrent.chapter08.threadpool.BasicThreadPool.getQueueSize(BasicThreadPool.java:204)
  126. at com.zoro.concurrent.chapter08.ThreadPoolTest.main(ThreadPoolTest.java:42)

最后

存在问题 【待优化项】

  • BasicThreadPool和Thread不应为继承关系,组合会更恰当;这样可以避免使用者直接调用Thread中的方法。
  • 线程池的销毁功能并未返回未被处理的任务,这样会导致未被处理的任务丢弃。
  • BasicThreadPool的构造函数参数过多,建议使用builder和设计模式对其封装或提供工厂方法进行构造。
  • 线程池中的数量控制没有进行合法性校验,如:initSize不应该大于maxSize。
  • 其他……