6.1 线程组

在线程组中定义一组相似或者相关的线程,在线程组中也可以定义子线程组。
Thread类在允许创建线程时指定线程组,如果在创建线程时没有指定线程组就属于父线程所在的线程组。JVM在创建main线程时会为它指定一个线程组,因此每个Java线程都有一个线程组与之关联,可以调用线程的getThreadGroup()方法返回线程组。
线程组开始是出于安全考虑设计用来区分不同的Applet,然而ThreadGroup并未实现这一目标,在新开发的系统中,已经不常用线程组。现在一般会将一组相关的线程存入一个数组或者集合中,如果仅仅是用来区分线程时,可以使用线程名称来区分,多数情况下,可以忽略线程组。

6.1.1 创建线程组

  1. public class Test01 {
  2. public static void main(String[] args) {
  3. // 返回当前main线程的线程组
  4. ThreadGroup mainGroup = Thread.currentThread().getThreadGroup();
  5. // java.lang.ThreadGroup[name=main,maxpri=10]
  6. System.out.println(mainGroup);
  7. // 定义线程组,如果不指定所属线程组,刚自动归属当前线程所属的线程组中
  8. ThreadGroup group1 = new ThreadGroup("group1");
  9. // java.lang.ThreadGroup[name=group1,maxpri=10]
  10. System.out.println(group1);
  11. // 定义线程组,同时指定父线程组
  12. ThreadGroup group2 = new ThreadGroup(mainGroup, "group2");
  13. // true
  14. System.out.println(group1.getParent() == mainGroup);
  15. // true
  16. System.out.println(group2.getParent() == mainGroup);
  17. Runnable r = ()->System.out.println(Thread.currentThread());
  18. // 在创建线程时,如果没有指定线程组,则默认归属到父线程的线程组中
  19. Thread t1 = new Thread(r, "t1");
  20. // Thread[t1,5,main]
  21. System.out.println(t1);
  22. // 创建线程时,可以指定线程所属线程组
  23. Thread t2 = new Thread(group1, r, "t2");
  24. Thread t3 = new Thread(group2, r, "t3");
  25. // Thread[t2,5,group1]
  26. System.out.println(t2);
  27. // Thread[t3,5,group2]
  28. System.out.println(t3);
  29. }
  30. }

6.1.2 线程组的基本操作

  1. activeCount():返回当前线程组及子线程组中活动线程的数量(近似值);
  2. activeGroupCount():返回当前线程组及子线程组中活动线程组的数量(近似值);
  3. int enumerate(Thread[] list):将当前线程组中的活动线程复制到参数数组中;
  4. int enumerate(ThreadGroup[] list):将当前线程组中活动线程组复制到参数数组中;
  5. int getMaxPriority():返回线程组的最大优先组,默认是10;
  6. String getName():返回线程组的名称;
  7. ThreadGroup getParent():返回父线程组;
  8. interrupt():中断线程组中所有线程;
  9. boolean isDaemon():判断当前线程组是否为守护线程组;
  10. list():将当前线程组中的活动线程打印 到屏幕上;
  11. parentOf(ThreadGroup g):判断当前线程是否为参数线程的父线程组;
  12. setDaemon(boolean daemon):设置当前线程为守护线程组。

    6.2 捕获线程的执行异常

    在线程的run方法中,如果有受检异常必须进行捕获处理,如果想要获得run()方法中出现的运行时异常信息,可以通过回调UncaughtExceptionHandler接口获得哪个线程出现了运行时异常。在Thread类中有关处理运行异常的方法有:

  13. getDefaultUncaughtExceptionHandler():获得全局的(默认的)UnCaughtExceptionHandler;

  14. getUncaughtExceptonHandler():静态方法,获得当前线程的UnCaughtExceptionHandler;
  15. setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh):设置全局的UnCaughtExceptionHandler;
  16. setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh):设置当前线程的UnCaughtExceptionHandler;

当线程运行过程中出现异常,JVM会调用Thread类的dispatchUncaughtException(Throwable e)方法,该方法会调用getUncaughtExceptonHandler().uncaughtException(this, e);如果想要获得线程中出现异常的信息,就需要设置线程的UncaughtExceptionHandler。

  1. public class Test01 {
  2. /**
  3. * 输出:
  4. * Thread-0开始运行
  5. * Thread-1线程产生了异常:null
  6. * Thread-0线程产生了异常:/ by zero
  7. */
  8. public static void main(String[] args) {
  9. // 1) 设置线程全局回调接口
  10. Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
  11. @Override
  12. public void uncaughtException(Thread t, Throwable e) {
  13. // t参数接收发生异常的线程,e就是该线程中的异常
  14. System.out.println(t.getName() + "线程产生了异常:" + e.getMessage());
  15. }
  16. });
  17. Thread t1 = new Thread(()->{
  18. System.out.println(Thread.currentThread().getName() + "开始运行");
  19. try {
  20. Thread.sleep(2000);
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. System.out.println(12/0);
  25. });
  26. t1.start();
  27. new Thread(()->{
  28. String text = null;
  29. System.out.println(text.length());
  30. }).start();
  31. }
  32. }

6.3 注入Hook钩子线程

目的是交验进程是否已经启动,防止重复启动程序。Hook线程也称为钩子线程,当JVM退出的时候会执行Hook线程,在程序启动时会创建一个.lock文件,用.lock文件校验程序是否启动,在程序退出(JVM退出)时删除.lock文件。在Hook线程中除了防止重新启动进程外,还可以做资源释放,尽量避免在Hook线程中进行复杂的操作。

  1. public class Test {
  2. public static void main(String[] args) {
  3. // 1) 注入Hook线程,在程序退出时删除.lock文件
  4. Runtime.getRuntime().addShutdownHook(new Thread(() -> {
  5. System.out.println("JVM退出,会启动当前Hook线程,在Hook线程中删除.lock文件");
  6. getLockFile().toFile().delete();
  7. }));
  8. // 2) 程序运行时,检查lock文件是否存在,如果lock存在,则抛出异常
  9. if (getLockFile().toFile().exists()){
  10. throw new RuntimeException("程序已启动");
  11. } else {
  12. try {
  13. getLockFile().toFile().createNewFile();
  14. System.out.println("程序在启动时创建了lock文件");
  15. } catch (IOException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. int time = 10;
  20. for (int i = 0; i < time; i++) {
  21. System.out.println("程序正在运行");
  22. try {
  23. TimeUnit.SECONDS.sleep(1);
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. }
  28. }
  29. private static Path getLockFile(){
  30. return Paths.get("", "tmp.lock");
  31. }
  32. }

6.4 线程池

6.4.1 基本概念

一个线程在run()方法运行结束后,线程对象会被GC释放。在真实生产环境中,可能需要很多线程来支撑整个应用,当线程数量非常多时,反而会耗尽CPU资源,如果不对线程进行控制与管理,反而会影响程序的性能。线程开销主要包括:

  1. 创建与启动线程的开销;
  2. 线程销毁开销;
  3. 线程调度的开销。

线程池就是有效使用线程的一种常用方式,线程池内部可以预先创建一定数量的工作线程,客户端代码直接将任务作为一个对象提交给线程池,线程池将这些任务缓存在工作队列中,线程池中的工作线程不断地从队列中取出任务并执行。

6.4.2 JDK对线程池的支持

JDK提供了一套Executor框架,可以帮助开发人员有效的使用线程池。

  1. /**
  2. * 基本使用
  3. */
  4. public class Test01 {
  5. public static void main(String[] args) {
  6. // 创建线程池
  7. ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
  8. // 向线程池中提交18个任务
  9. int taskNum = 18;
  10. for (int i = 0; i < taskNum; i++){
  11. fixedThreadPool.execute(()->{
  12. System.out.println(Thread.currentThread().getId() + "编号的任务在执行,开始时间:" + System.currentTimeMillis());
  13. try {
  14. Thread.sleep(3000);
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. });
  19. }
  20. }
  21. }
  22. /**
  23. * 线程池的计划任务
  24. */
  25. public class Test02 {
  26. public static void main(String[] args) {
  27. int threadNum = 10;
  28. ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(threadNum);
  29. // 在延迟2秒后执行任务
  30. scheduledExecutorService.schedule(()->{
  31. System.out.println(Thread.currentThread().getId() + " -- " + System.currentTimeMillis());
  32. }, 2, TimeUnit.SECONDS);
  33. // 以固定的频率执行任务,开启任务的时间是固定的:在3秒后执行任务,以后每隔2秒重新执行一次
  34. scheduledExecutorService.scheduleAtFixedRate(
  35. ()-> {
  36. System.out.println(Thread.currentThread().getId() + " -- " + System.currentTimeMillis());
  37. try {
  38. // 如果任务执行时长超过了时间时隔,则任务完成后立即开启下一个任务
  39. TimeUnit.SECONDS.sleep(3);
  40. } catch (InterruptedException e) {
  41. e.printStackTrace();
  42. }
  43. },
  44. 3, 2, TimeUnit.SECONDS);
  45. // 在上次任务结束后,在固定延迟后再次执行该任务,不管执行任务耗时多长,总是在任务结束后的固定时间间隔后开启
  46. scheduledExecutorService.scheduleWithFixedDelay(
  47. ()-> {
  48. System.out.println(Thread.currentThread().getId() + " -- " + System.currentTimeMillis());
  49. try {
  50. TimeUnit.SECONDS.sleep(3);
  51. } catch (InterruptedException e) {
  52. e.printStackTrace();
  53. }
  54. },
  55. 3, 2, TimeUnit.SECONDS);
  56. }
  57. }

6.4.3 核心线程池的底层实现

Executors工具类中返回线程池的方法都使用了ThreadPoolExecutor线程池,都是ThreadPoolExecutor线程池的封装。
ThreadPoolExecutor的构造方法:

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler)
  1. corePoolSize:指定线程池中核心线程的数量;
  2. maximumPoolSize:指定线程池中最大线程数量;
  3. keepAliveTime:当前线程池的数量超过corePoolSize时,多余的空闲线程的存活时长,即空闲线程在多长时长内销毁;
  4. unit:keepAliveTime时长单位;
  5. workQueue:任务队列,把任务提交到该任务队列中等待执行;
    1. 直接提交队列,由SynchronousQueue对象提供,该队列没有容量,提交给线程池的任务不会被真实保存,总是将新任务提交给线程。如果没有空闲线程,则尝试创建新线程;如果线程数量已经达到maximumPoolSize规定的最大值则执行拒绝策略。该线程池在极端的情况下,每次提交新的任务都会创建新的线程执行,适合用来执行大量耗时短并且提交频繁的任务。
    2. 有界任务队列:由ArrayBlockingQueue实现,在创建ArrayBlockingQueue对象时,需要指定一个容量。当有任务需要执行时,如果线程池中线程数小于corePoolSize核心线程数,则创建新的线程;如果大于corePoolSize核心线程数,则加入等待队列;如果队列已满,则无法加入;在线程数小于maxinumPoolSize指定的最大线程数前提下,会创建新的线程来执行;如果线程数大于maximumPoolSize最大线程数,则执行拒绝策略。
    3. 无界任务队列:由LinkedBlockingQueue实现,如果不指定容量,与有界队列相比,除非系统资源耗尽,否则无界队列不存在任务入队失败的情况。当有新任务时,在系统线程数小于corePoolSize核心线程数,则创建新的线程来执行任务;当线程池中线程数量大于corePoolSize核心线程,则把任务加入阻塞队列。
    4. 优先任务队列:由PriorityBlockingQueue实现,是带有任务优先级的队列,是一个特殊的无界队列。不管是ArrayBlockingQueue,还是LinkedBlockingQueue都是按照先进先出算法处理任务的,在PriorityBlockingQueue队列中可以根据任务优先级顺序先后执行。
  6. threadFactory:用于创建线程;
  7. handler:拒绝策略,当任务太多来不及处理时,如何拒绝。

    6.4.4 拒绝策略

    当提交给线程池的任务超过实际承载能力时,即线程池中的线程已经用完时,等待队列也满了,无法为新提交的任务服务,可以通过拒绝策略来处理这个问题,JDK提供了四种拒绝策略:

  8. AbortPolicy策略(默认策略)会抛出异常;

  9. CallerRunPolicy策略只要线程池没有关闭,会在调用者线程中运行当前被丢弃的任务;
  10. DiscardOldestPolicy策略会将任务队列中最老的任务丢弃,尝试再次提交新任务;
  11. DiscardPolicy直接丢弃这个无法处理的任务。

如果内置的拒绝策略无法满足实际需求,可以扩展RejectedExecutionHandler接口:

  1. public class Test03 {
  2. public static void main(String[] args) {
  3. // 定义任务
  4. Runnable r = () -> {
  5. int num = new Random().nextInt(5);
  6. System.out.println(Thread.currentThread().getId()
  7. + " -- " + System.currentTimeMillis() + "开始睡眠" + num + "秒");
  8. try {
  9. TimeUnit.SECONDS.sleep(num);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. };
  14. // 创建线程池
  15. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0,
  16. TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(), (r1, executor) -> {
  17. // r就是请求的任务,executor就是当前线程池
  18. System.out.println(r1 + " is discarding");
  19. });
  20. // 向线程池提交若干任务
  21. for (int i = 0; i < Integer.MAX_VALUE; i++) {
  22. threadPoolExecutor.submit(r);
  23. }
  24. }
  25. }

6.4.5 ThreadFactory

线程池中的线程从ThreadFactory被创建,ThreadFactory是一个接口,只有一个用来创建线程的方法:Thread newThread(Runnable r)

  1. public class Test04 {
  2. public static void main(String[] args) {
  3. // 定义任务
  4. Runnable r = () -> {
  5. int num = new Random().nextInt(5);
  6. System.out.println(Thread.currentThread().getId()
  7. + " -- " + System.currentTimeMillis() + "开始睡眠" + num + "秒");
  8. try {
  9. TimeUnit.SECONDS.sleep(num);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. };
  14. // 创建线程池,使用自定义线程工厂
  15. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0,
  16. TimeUnit.SECONDS, new SynchronousQueue<>(), r1 -> {
  17. Thread thread = new Thread(r1);
  18. // 设置为守护线程,当主线程运行结束,线程池中的线程会自动退出
  19. thread.setDaemon(true);
  20. System.out.println("创建了线程:" + thread);
  21. return thread;
  22. });
  23. // 向线程池提交若干任务,当任务大于5时,线程池执行默认的拒绝策略,抛出异常
  24. int threadNum = 5;
  25. for (int i = 0; i < threadNum; i++) {
  26. threadPoolExecutor.submit(r);
  27. }
  28. try {
  29. Thread.sleep(10000);
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. }

6.4.6 监控线程池

ThreadPoolExecutor提供了一组方法用于监控线程池:

  1. int getActiveCount():获得线程池中当前线程的数量;
  2. long getCompletedTaskCount():返回线程池完成任务的数量;
  3. int getCorePoolSize():返回核心线程数量;
  4. int getLargestPoolSize():返回线程池曾经达到的线程的最大数;
  5. int getMaximumPoolSize():返回线程池的最大容量;
  6. int getPoolSize():当前线程池的大小;
  7. BlockingQueue getQueue():返回阻塞队列;
  8. long getTaskCount():返回线程池收到的任务总数。

    6.4.7 扩展线程池

    ThreadPoolExecutor线程池提供了两个方法:

  9. protected void afterExecute(Runnable r, Throwable t);

  10. protected void beforeExecute(Thread t, Runnable r);

在线程池执行某个任务前会调用beforeExecute()方法,在任务结束会(任务异常退出)会执行afterExecute()方法。在ThreadPoolExecutor类中定义了一个内部类Worker,ThreadPoolExecutor线程池中工作线程就是Worker类的实例,Worker实例在执行时也会调用beforeExecute()与afterExecute()方法。

  1. public class Test05 {
  2. private static class MyTask implements Runnable {
  3. private String name;
  4. public MyTask(String name){
  5. this.name = name;
  6. }
  7. @Override
  8. public void run() {
  9. System.out.println(name + "任务正在被线程" + Thread.currentThread().getId() + "执行");
  10. try {
  11. Thread.sleep(1000);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. }
  17. public static void main(String[] args) {
  18. // 定义扩展线程池,可以定义线程池继承ThreadPoolExecutor,在子类中重写beforeExecute()/afterExecute()方法
  19. // 也可以直接使用ThreadPoolExecutor的内部类
  20. ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0,
  21. TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory()){
  22. @Override
  23. protected void beforeExecute(Thread t, Runnable r) {
  24. System.out.println(t.getId() + "线程准备执行任务:" + ((MyTask)r).name);
  25. }
  26. @Override
  27. protected void afterExecute(Runnable r, Throwable t) {
  28. System.out.println(((MyTask)r).name + "任务执行完毕");
  29. }
  30. @Override
  31. protected void terminated() {
  32. System.out.println("线程池退出");
  33. }
  34. };
  35. int taskNum = 5;
  36. for (int i = 0; i < taskNum; i++) {
  37. MyTask task = new MyTask("task-" + i);
  38. executorService.execute(task);
  39. }
  40. // 关闭线程池,仅仅是说不再接收新的任务
  41. executorService.shutdown();
  42. }
  43. }

6.4.8 优化线程池数量

线程池大小对系统性能有一定影响,过大或者过小都会无法发挥最优的系统

性能,线程池大小不需要非常精确,只要避免极大或者极小的情况即可。一般来说,线程池大小需要考虑CPU数量、内存大小等因素,在《Java Concurrent in Practice》书中给出一个估算线程池大小公式:
线程池大小 = CPU的数量 目标CPU的使用率 (1 + 等待时间与计算时间的比)

6.4.9 线程池死锁

如果在线程池中执行的任务A在执行过程中又向线程池提交了任务B,任务B添加了线程池的等待队列中,如果任务A的结束需要等待任务B的执行结果,就有可能会出现这种情况。
线程池适合提交相互独立的任务,而不是相互依赖的任务。

6.4.10 线程池的异常处理

线程池运行过程,线程出现异常却未输出错误信息,如线程运行出现零除异常,解决方法如下:

  1. 使用execute(),而不是submit()。
  2. 对ThreadPoolExecutor进行扩展,对运行的任务进行包装。将任务放入新方法wrap()返回的Runnable接口的run()方法的try-catch块中,在catch中重新抛出异常,然后重写submit()方法,将wrap()接口返回的Runnable接口传入submit()方法中。

    6.4.11 ForkJoinPool线程池

    “分而治之”是一个有效的处理大数据的方法,著名的MapReduce就是采用这种思路。系统对ForkJoinPool线程池进行了优化,把一个大任务调用fork()方法分解为若干个小的任务,把小任务的处理结果进行join()合并为大任务的结果。提交的任务数量与线程的数量不一定是一对一关系,在多数情况下,一个物理线程实际上需要处理多个逻辑任务。ForkJoinPool线程池中最常用的方法是: ForkJoinTask submit(ForkJoinTask task)向线程池提交一个ForkJoinTask任务,ForkJoinTask任务支持fork()分解与join()等待的任务。ForkJoinTask有两个重要的子类:RecursiveAction和RecursiveTask,它们的区别在于RecursiveAction任务没有返回值,RecursiveTask任务可以带有返回值。

    1. public class Test06 {
    2. /**
    3. * 计算数列的和,需要返回结果,可以定义任务继承RecursiveTask
    4. */
    5. private static class CountTask extends RecursiveTask<Long> {
    6. /**
    7. * 当个任务量阀值
    8. */
    9. private static final int THRESHOLD = 10000;
    10. /**
    11. * 分解任务数
    12. */
    13. private static final int TASK_NUM = 100;
    14. private long start;
    15. private long end;
    16. public CountTask(long start, long end) {
    17. this.start = start;
    18. this.end = end;
    19. }
    20. @Override
    21. protected Long compute() {
    22. long sum = 0;
    23. if (end - start < THRESHOLD) {
    24. for (long i = start; i <= end; i++) {
    25. sum += i;
    26. }
    27. } else {
    28. // 拆分任务
    29. long step = (start + end) / 100;
    30. ArrayList<CountTask> subTaskList = new ArrayList<>();
    31. long pos = start;
    32. for (int i = 0; i < TASK_NUM; i++) {
    33. long lastOne = pos + step;
    34. if (lastOne > end) {
    35. lastOne = end;
    36. }
    37. CountTask task = new CountTask(pos, lastOne);
    38. subTaskList.add(task);
    39. // 调用fork()提交子任务
    40. task.fork();
    41. pos += step + 1;
    42. }
    43. // 合并结果
    44. for (CountTask task :
    45. subTaskList) {
    46. sum += task.join();
    47. }
    48. }
    49. return sum;
    50. }
    51. }
    52. public static void main(String[] args) {
    53. ForkJoinPool forkJoinPool = new ForkJoinPool();
    54. CountTask task = new CountTask(0L, 200000L);
    55. ForkJoinTask<Long> result = forkJoinPool.submit(task);
    56. Long res = null;
    57. try {
    58. res = result.get();
    59. System.out.println("计算数列结果为:" + res);
    60. } catch (InterruptedException e) {
    61. e.printStackTrace();
    62. } catch (ExecutionException e) {
    63. e.printStackTrace();
    64. }
    65. }
    66. }