6.1 线程组
在线程组中定义一组相似或者相关的线程,在线程组中也可以定义子线程组。
Thread类在允许创建线程时指定线程组,如果在创建线程时没有指定线程组就属于父线程所在的线程组。JVM在创建main线程时会为它指定一个线程组,因此每个Java线程都有一个线程组与之关联,可以调用线程的getThreadGroup()方法返回线程组。
线程组开始是出于安全考虑设计用来区分不同的Applet,然而ThreadGroup并未实现这一目标,在新开发的系统中,已经不常用线程组。现在一般会将一组相关的线程存入一个数组或者集合中,如果仅仅是用来区分线程时,可以使用线程名称来区分,多数情况下,可以忽略线程组。
6.1.1 创建线程组
public class Test01 {public static void main(String[] args) {// 返回当前main线程的线程组ThreadGroup mainGroup = Thread.currentThread().getThreadGroup();// java.lang.ThreadGroup[name=main,maxpri=10]System.out.println(mainGroup);// 定义线程组,如果不指定所属线程组,刚自动归属当前线程所属的线程组中ThreadGroup group1 = new ThreadGroup("group1");// java.lang.ThreadGroup[name=group1,maxpri=10]System.out.println(group1);// 定义线程组,同时指定父线程组ThreadGroup group2 = new ThreadGroup(mainGroup, "group2");// trueSystem.out.println(group1.getParent() == mainGroup);// trueSystem.out.println(group2.getParent() == mainGroup);Runnable r = ()->System.out.println(Thread.currentThread());// 在创建线程时,如果没有指定线程组,则默认归属到父线程的线程组中Thread t1 = new Thread(r, "t1");// Thread[t1,5,main]System.out.println(t1);// 创建线程时,可以指定线程所属线程组Thread t2 = new Thread(group1, r, "t2");Thread t3 = new Thread(group2, r, "t3");// Thread[t2,5,group1]System.out.println(t2);// Thread[t3,5,group2]System.out.println(t3);}}
6.1.2 线程组的基本操作
- activeCount():返回当前线程组及子线程组中活动线程的数量(近似值);
- activeGroupCount():返回当前线程组及子线程组中活动线程组的数量(近似值);
- int enumerate(Thread[] list):将当前线程组中的活动线程复制到参数数组中;
- int enumerate(ThreadGroup[] list):将当前线程组中活动线程组复制到参数数组中;
- int getMaxPriority():返回线程组的最大优先组,默认是10;
- String getName():返回线程组的名称;
- ThreadGroup getParent():返回父线程组;
- interrupt():中断线程组中所有线程;
- boolean isDaemon():判断当前线程组是否为守护线程组;
- list():将当前线程组中的活动线程打印 到屏幕上;
- parentOf(ThreadGroup g):判断当前线程是否为参数线程的父线程组;
setDaemon(boolean daemon):设置当前线程为守护线程组。
6.2 捕获线程的执行异常
在线程的run方法中,如果有受检异常必须进行捕获处理,如果想要获得run()方法中出现的运行时异常信息,可以通过回调UncaughtExceptionHandler接口获得哪个线程出现了运行时异常。在Thread类中有关处理运行异常的方法有:
getDefaultUncaughtExceptionHandler():获得全局的(默认的)UnCaughtExceptionHandler;
- getUncaughtExceptonHandler():静态方法,获得当前线程的UnCaughtExceptionHandler;
- setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh):设置全局的UnCaughtExceptionHandler;
- setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh):设置当前线程的UnCaughtExceptionHandler;
当线程运行过程中出现异常,JVM会调用Thread类的dispatchUncaughtException(Throwable e)方法,该方法会调用getUncaughtExceptonHandler().uncaughtException(this, e);如果想要获得线程中出现异常的信息,就需要设置线程的UncaughtExceptionHandler。
public class Test01 {/*** 输出:* Thread-0开始运行* Thread-1线程产生了异常:null* Thread-0线程产生了异常:/ by zero*/public static void main(String[] args) {// 1) 设置线程全局回调接口Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {@Overridepublic void uncaughtException(Thread t, Throwable e) {// t参数接收发生异常的线程,e就是该线程中的异常System.out.println(t.getName() + "线程产生了异常:" + e.getMessage());}});Thread t1 = new Thread(()->{System.out.println(Thread.currentThread().getName() + "开始运行");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(12/0);});t1.start();new Thread(()->{String text = null;System.out.println(text.length());}).start();}}
6.3 注入Hook钩子线程
目的是交验进程是否已经启动,防止重复启动程序。Hook线程也称为钩子线程,当JVM退出的时候会执行Hook线程,在程序启动时会创建一个.lock文件,用.lock文件校验程序是否启动,在程序退出(JVM退出)时删除.lock文件。在Hook线程中除了防止重新启动进程外,还可以做资源释放,尽量避免在Hook线程中进行复杂的操作。
public class Test {public static void main(String[] args) {// 1) 注入Hook线程,在程序退出时删除.lock文件Runtime.getRuntime().addShutdownHook(new Thread(() -> {System.out.println("JVM退出,会启动当前Hook线程,在Hook线程中删除.lock文件");getLockFile().toFile().delete();}));// 2) 程序运行时,检查lock文件是否存在,如果lock存在,则抛出异常if (getLockFile().toFile().exists()){throw new RuntimeException("程序已启动");} else {try {getLockFile().toFile().createNewFile();System.out.println("程序在启动时创建了lock文件");} catch (IOException e) {e.printStackTrace();}}int time = 10;for (int i = 0; i < time; i++) {System.out.println("程序正在运行");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}private static Path getLockFile(){return Paths.get("", "tmp.lock");}}
6.4 线程池
6.4.1 基本概念
一个线程在run()方法运行结束后,线程对象会被GC释放。在真实生产环境中,可能需要很多线程来支撑整个应用,当线程数量非常多时,反而会耗尽CPU资源,如果不对线程进行控制与管理,反而会影响程序的性能。线程开销主要包括:
- 创建与启动线程的开销;
- 线程销毁开销;
- 线程调度的开销。
线程池就是有效使用线程的一种常用方式,线程池内部可以预先创建一定数量的工作线程,客户端代码直接将任务作为一个对象提交给线程池,线程池将这些任务缓存在工作队列中,线程池中的工作线程不断地从队列中取出任务并执行。
6.4.2 JDK对线程池的支持
JDK提供了一套Executor框架,可以帮助开发人员有效的使用线程池。
/*** 基本使用*/public class Test01 {public static void main(String[] args) {// 创建线程池ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);// 向线程池中提交18个任务int taskNum = 18;for (int i = 0; i < taskNum; i++){fixedThreadPool.execute(()->{System.out.println(Thread.currentThread().getId() + "编号的任务在执行,开始时间:" + System.currentTimeMillis());try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}});}}}/*** 线程池的计划任务*/public class Test02 {public static void main(String[] args) {int threadNum = 10;ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(threadNum);// 在延迟2秒后执行任务scheduledExecutorService.schedule(()->{System.out.println(Thread.currentThread().getId() + " -- " + System.currentTimeMillis());}, 2, TimeUnit.SECONDS);// 以固定的频率执行任务,开启任务的时间是固定的:在3秒后执行任务,以后每隔2秒重新执行一次scheduledExecutorService.scheduleAtFixedRate(()-> {System.out.println(Thread.currentThread().getId() + " -- " + System.currentTimeMillis());try {// 如果任务执行时长超过了时间时隔,则任务完成后立即开启下一个任务TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}},3, 2, TimeUnit.SECONDS);// 在上次任务结束后,在固定延迟后再次执行该任务,不管执行任务耗时多长,总是在任务结束后的固定时间间隔后开启scheduledExecutorService.scheduleWithFixedDelay(()-> {System.out.println(Thread.currentThread().getId() + " -- " + System.currentTimeMillis());try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}},3, 2, TimeUnit.SECONDS);}}
6.4.3 核心线程池的底层实现
Executors工具类中返回线程池的方法都使用了ThreadPoolExecutor线程池,都是ThreadPoolExecutor线程池的封装。
ThreadPoolExecutor的构造方法:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
- corePoolSize:指定线程池中核心线程的数量;
- maximumPoolSize:指定线程池中最大线程数量;
- keepAliveTime:当前线程池的数量超过corePoolSize时,多余的空闲线程的存活时长,即空闲线程在多长时长内销毁;
- unit:keepAliveTime时长单位;
- workQueue:任务队列,把任务提交到该任务队列中等待执行;
- 直接提交队列,由SynchronousQueue对象提供,该队列没有容量,提交给线程池的任务不会被真实保存,总是将新任务提交给线程。如果没有空闲线程,则尝试创建新线程;如果线程数量已经达到maximumPoolSize规定的最大值则执行拒绝策略。该线程池在极端的情况下,每次提交新的任务都会创建新的线程执行,适合用来执行大量耗时短并且提交频繁的任务。
- 有界任务队列:由ArrayBlockingQueue实现,在创建ArrayBlockingQueue对象时,需要指定一个容量。当有任务需要执行时,如果线程池中线程数小于corePoolSize核心线程数,则创建新的线程;如果大于corePoolSize核心线程数,则加入等待队列;如果队列已满,则无法加入;在线程数小于maxinumPoolSize指定的最大线程数前提下,会创建新的线程来执行;如果线程数大于maximumPoolSize最大线程数,则执行拒绝策略。
- 无界任务队列:由LinkedBlockingQueue实现,如果不指定容量,与有界队列相比,除非系统资源耗尽,否则无界队列不存在任务入队失败的情况。当有新任务时,在系统线程数小于corePoolSize核心线程数,则创建新的线程来执行任务;当线程池中线程数量大于corePoolSize核心线程,则把任务加入阻塞队列。
- 优先任务队列:由PriorityBlockingQueue实现,是带有任务优先级的队列,是一个特殊的无界队列。不管是ArrayBlockingQueue,还是LinkedBlockingQueue都是按照先进先出算法处理任务的,在PriorityBlockingQueue队列中可以根据任务优先级顺序先后执行。
- threadFactory:用于创建线程;
handler:拒绝策略,当任务太多来不及处理时,如何拒绝。
6.4.4 拒绝策略
当提交给线程池的任务超过实际承载能力时,即线程池中的线程已经用完时,等待队列也满了,无法为新提交的任务服务,可以通过拒绝策略来处理这个问题,JDK提供了四种拒绝策略:
AbortPolicy策略(默认策略)会抛出异常;
- CallerRunPolicy策略只要线程池没有关闭,会在调用者线程中运行当前被丢弃的任务;
- DiscardOldestPolicy策略会将任务队列中最老的任务丢弃,尝试再次提交新任务;
- DiscardPolicy直接丢弃这个无法处理的任务。
如果内置的拒绝策略无法满足实际需求,可以扩展RejectedExecutionHandler接口:
public class Test03 {public static void main(String[] args) {// 定义任务Runnable r = () -> {int num = new Random().nextInt(5);System.out.println(Thread.currentThread().getId()+ " -- " + System.currentTimeMillis() + "开始睡眠" + num + "秒");try {TimeUnit.SECONDS.sleep(num);} catch (InterruptedException e) {e.printStackTrace();}};// 创建线程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0,TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(), (r1, executor) -> {// r就是请求的任务,executor就是当前线程池System.out.println(r1 + " is discarding");});// 向线程池提交若干任务for (int i = 0; i < Integer.MAX_VALUE; i++) {threadPoolExecutor.submit(r);}}}
6.4.5 ThreadFactory
线程池中的线程从ThreadFactory被创建,ThreadFactory是一个接口,只有一个用来创建线程的方法:Thread newThread(Runnable r)。
public class Test04 {public static void main(String[] args) {// 定义任务Runnable r = () -> {int num = new Random().nextInt(5);System.out.println(Thread.currentThread().getId()+ " -- " + System.currentTimeMillis() + "开始睡眠" + num + "秒");try {TimeUnit.SECONDS.sleep(num);} catch (InterruptedException e) {e.printStackTrace();}};// 创建线程池,使用自定义线程工厂ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0,TimeUnit.SECONDS, new SynchronousQueue<>(), r1 -> {Thread thread = new Thread(r1);// 设置为守护线程,当主线程运行结束,线程池中的线程会自动退出thread.setDaemon(true);System.out.println("创建了线程:" + thread);return thread;});// 向线程池提交若干任务,当任务大于5时,线程池执行默认的拒绝策略,抛出异常int threadNum = 5;for (int i = 0; i < threadNum; i++) {threadPoolExecutor.submit(r);}try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}}}
6.4.6 监控线程池
ThreadPoolExecutor提供了一组方法用于监控线程池:
- int getActiveCount():获得线程池中当前线程的数量;
- long getCompletedTaskCount():返回线程池完成任务的数量;
- int getCorePoolSize():返回核心线程数量;
- int getLargestPoolSize():返回线程池曾经达到的线程的最大数;
- int getMaximumPoolSize():返回线程池的最大容量;
- int getPoolSize():当前线程池的大小;
- BlockingQueue getQueue():返回阻塞队列;
long getTaskCount():返回线程池收到的任务总数。
6.4.7 扩展线程池
ThreadPoolExecutor线程池提供了两个方法:
protected void afterExecute(Runnable r, Throwable t);
- protected void beforeExecute(Thread t, Runnable r);
在线程池执行某个任务前会调用beforeExecute()方法,在任务结束会(任务异常退出)会执行afterExecute()方法。在ThreadPoolExecutor类中定义了一个内部类Worker,ThreadPoolExecutor线程池中工作线程就是Worker类的实例,Worker实例在执行时也会调用beforeExecute()与afterExecute()方法。
public class Test05 {private static class MyTask implements Runnable {private String name;public MyTask(String name){this.name = name;}@Overridepublic void run() {System.out.println(name + "任务正在被线程" + Thread.currentThread().getId() + "执行");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {// 定义扩展线程池,可以定义线程池继承ThreadPoolExecutor,在子类中重写beforeExecute()/afterExecute()方法// 也可以直接使用ThreadPoolExecutor的内部类ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0,TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory()){@Overrideprotected void beforeExecute(Thread t, Runnable r) {System.out.println(t.getId() + "线程准备执行任务:" + ((MyTask)r).name);}@Overrideprotected void afterExecute(Runnable r, Throwable t) {System.out.println(((MyTask)r).name + "任务执行完毕");}@Overrideprotected void terminated() {System.out.println("线程池退出");}};int taskNum = 5;for (int i = 0; i < taskNum; i++) {MyTask task = new MyTask("task-" + i);executorService.execute(task);}// 关闭线程池,仅仅是说不再接收新的任务executorService.shutdown();}}
6.4.8 优化线程池数量
线程池大小对系统性能有一定影响,过大或者过小都会无法发挥最优的系统
性能,线程池大小不需要非常精确,只要避免极大或者极小的情况即可。一般来说,线程池大小需要考虑CPU数量、内存大小等因素,在《Java Concurrent in Practice》书中给出一个估算线程池大小公式:
线程池大小 = CPU的数量 目标CPU的使用率 (1 + 等待时间与计算时间的比)
6.4.9 线程池死锁
如果在线程池中执行的任务A在执行过程中又向线程池提交了任务B,任务B添加了线程池的等待队列中,如果任务A的结束需要等待任务B的执行结果,就有可能会出现这种情况。
线程池适合提交相互独立的任务,而不是相互依赖的任务。
6.4.10 线程池的异常处理
线程池运行过程,线程出现异常却未输出错误信息,如线程运行出现零除异常,解决方法如下:
- 使用execute(),而不是submit()。
对ThreadPoolExecutor进行扩展,对运行的任务进行包装。将任务放入新方法wrap()返回的Runnable接口的run()方法的try-catch块中,在catch中重新抛出异常,然后重写submit()方法,将wrap()接口返回的Runnable接口传入submit()方法中。
6.4.11 ForkJoinPool线程池
“分而治之”是一个有效的处理大数据的方法,著名的MapReduce就是采用这种思路。系统对ForkJoinPool线程池进行了优化,把一个大任务调用fork()方法分解为若干个小的任务,把小任务的处理结果进行join()合并为大任务的结果。提交的任务数量与线程的数量不一定是一对一关系,在多数情况下,一个物理线程实际上需要处理多个逻辑任务。ForkJoinPool线程池中最常用的方法是:
ForkJoinTask submit(ForkJoinTask task)向线程池提交一个ForkJoinTask任务,ForkJoinTask任务支持fork()分解与join()等待的任务。ForkJoinTask有两个重要的子类:RecursiveAction和RecursiveTask,它们的区别在于RecursiveAction任务没有返回值,RecursiveTask任务可以带有返回值。 public class Test06 {/*** 计算数列的和,需要返回结果,可以定义任务继承RecursiveTask*/private static class CountTask extends RecursiveTask<Long> {/*** 当个任务量阀值*/private static final int THRESHOLD = 10000;/*** 分解任务数*/private static final int TASK_NUM = 100;private long start;private long end;public CountTask(long start, long end) {this.start = start;this.end = end;}@Overrideprotected Long compute() {long sum = 0;if (end - start < THRESHOLD) {for (long i = start; i <= end; i++) {sum += i;}} else {// 拆分任务long step = (start + end) / 100;ArrayList<CountTask> subTaskList = new ArrayList<>();long pos = start;for (int i = 0; i < TASK_NUM; i++) {long lastOne = pos + step;if (lastOne > end) {lastOne = end;}CountTask task = new CountTask(pos, lastOne);subTaskList.add(task);// 调用fork()提交子任务task.fork();pos += step + 1;}// 合并结果for (CountTask task :subTaskList) {sum += task.join();}}return sum;}}public static void main(String[] args) {ForkJoinPool forkJoinPool = new ForkJoinPool();CountTask task = new CountTask(0L, 200000L);ForkJoinTask<Long> result = forkJoinPool.submit(task);Long res = null;try {res = result.get();System.out.println("计算数列结果为:" + res);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}}
