6.1 线程组
在线程组中定义一组相似或者相关的线程,在线程组中也可以定义子线程组。
Thread类在允许创建线程时指定线程组,如果在创建线程时没有指定线程组就属于父线程所在的线程组。JVM在创建main线程时会为它指定一个线程组,因此每个Java线程都有一个线程组与之关联,可以调用线程的getThreadGroup()方法返回线程组。
线程组开始是出于安全考虑设计用来区分不同的Applet,然而ThreadGroup并未实现这一目标,在新开发的系统中,已经不常用线程组。现在一般会将一组相关的线程存入一个数组或者集合中,如果仅仅是用来区分线程时,可以使用线程名称来区分,多数情况下,可以忽略线程组。
6.1.1 创建线程组
public class Test01 {
public static void main(String[] args) {
// 返回当前main线程的线程组
ThreadGroup mainGroup = Thread.currentThread().getThreadGroup();
// java.lang.ThreadGroup[name=main,maxpri=10]
System.out.println(mainGroup);
// 定义线程组,如果不指定所属线程组,刚自动归属当前线程所属的线程组中
ThreadGroup group1 = new ThreadGroup("group1");
// java.lang.ThreadGroup[name=group1,maxpri=10]
System.out.println(group1);
// 定义线程组,同时指定父线程组
ThreadGroup group2 = new ThreadGroup(mainGroup, "group2");
// true
System.out.println(group1.getParent() == mainGroup);
// true
System.out.println(group2.getParent() == mainGroup);
Runnable r = ()->System.out.println(Thread.currentThread());
// 在创建线程时,如果没有指定线程组,则默认归属到父线程的线程组中
Thread t1 = new Thread(r, "t1");
// Thread[t1,5,main]
System.out.println(t1);
// 创建线程时,可以指定线程所属线程组
Thread t2 = new Thread(group1, r, "t2");
Thread t3 = new Thread(group2, r, "t3");
// Thread[t2,5,group1]
System.out.println(t2);
// Thread[t3,5,group2]
System.out.println(t3);
}
}
6.1.2 线程组的基本操作
- activeCount():返回当前线程组及子线程组中活动线程的数量(近似值);
- activeGroupCount():返回当前线程组及子线程组中活动线程组的数量(近似值);
- int enumerate(Thread[] list):将当前线程组中的活动线程复制到参数数组中;
- int enumerate(ThreadGroup[] list):将当前线程组中活动线程组复制到参数数组中;
- int getMaxPriority():返回线程组的最大优先组,默认是10;
- String getName():返回线程组的名称;
- ThreadGroup getParent():返回父线程组;
- interrupt():中断线程组中所有线程;
- boolean isDaemon():判断当前线程组是否为守护线程组;
- list():将当前线程组中的活动线程打印 到屏幕上;
- parentOf(ThreadGroup g):判断当前线程是否为参数线程的父线程组;
setDaemon(boolean daemon):设置当前线程为守护线程组。
6.2 捕获线程的执行异常
在线程的run方法中,如果有受检异常必须进行捕获处理,如果想要获得run()方法中出现的运行时异常信息,可以通过回调UncaughtExceptionHandler接口获得哪个线程出现了运行时异常。在Thread类中有关处理运行异常的方法有:
getDefaultUncaughtExceptionHandler():获得全局的(默认的)UnCaughtExceptionHandler;
- getUncaughtExceptonHandler():静态方法,获得当前线程的UnCaughtExceptionHandler;
- setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh):设置全局的UnCaughtExceptionHandler;
- setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh):设置当前线程的UnCaughtExceptionHandler;
当线程运行过程中出现异常,JVM会调用Thread类的dispatchUncaughtException(Throwable e)方法,该方法会调用getUncaughtExceptonHandler().uncaughtException(this, e);如果想要获得线程中出现异常的信息,就需要设置线程的UncaughtExceptionHandler。
public class Test01 {
/**
* 输出:
* Thread-0开始运行
* Thread-1线程产生了异常:null
* Thread-0线程产生了异常:/ by zero
*/
public static void main(String[] args) {
// 1) 设置线程全局回调接口
Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
// t参数接收发生异常的线程,e就是该线程中的异常
System.out.println(t.getName() + "线程产生了异常:" + e.getMessage());
}
});
Thread t1 = new Thread(()->{
System.out.println(Thread.currentThread().getName() + "开始运行");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(12/0);
});
t1.start();
new Thread(()->{
String text = null;
System.out.println(text.length());
}).start();
}
}
6.3 注入Hook钩子线程
目的是交验进程是否已经启动,防止重复启动程序。Hook线程也称为钩子线程,当JVM退出的时候会执行Hook线程,在程序启动时会创建一个.lock文件,用.lock文件校验程序是否启动,在程序退出(JVM退出)时删除.lock文件。在Hook线程中除了防止重新启动进程外,还可以做资源释放,尽量避免在Hook线程中进行复杂的操作。
public class Test {
public static void main(String[] args) {
// 1) 注入Hook线程,在程序退出时删除.lock文件
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("JVM退出,会启动当前Hook线程,在Hook线程中删除.lock文件");
getLockFile().toFile().delete();
}));
// 2) 程序运行时,检查lock文件是否存在,如果lock存在,则抛出异常
if (getLockFile().toFile().exists()){
throw new RuntimeException("程序已启动");
} else {
try {
getLockFile().toFile().createNewFile();
System.out.println("程序在启动时创建了lock文件");
} catch (IOException e) {
e.printStackTrace();
}
}
int time = 10;
for (int i = 0; i < time; i++) {
System.out.println("程序正在运行");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static Path getLockFile(){
return Paths.get("", "tmp.lock");
}
}
6.4 线程池
6.4.1 基本概念
一个线程在run()方法运行结束后,线程对象会被GC释放。在真实生产环境中,可能需要很多线程来支撑整个应用,当线程数量非常多时,反而会耗尽CPU资源,如果不对线程进行控制与管理,反而会影响程序的性能。线程开销主要包括:
- 创建与启动线程的开销;
- 线程销毁开销;
- 线程调度的开销。
线程池就是有效使用线程的一种常用方式,线程池内部可以预先创建一定数量的工作线程,客户端代码直接将任务作为一个对象提交给线程池,线程池将这些任务缓存在工作队列中,线程池中的工作线程不断地从队列中取出任务并执行。
6.4.2 JDK对线程池的支持
JDK提供了一套Executor框架,可以帮助开发人员有效的使用线程池。
/**
* 基本使用
*/
public class Test01 {
public static void main(String[] args) {
// 创建线程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
// 向线程池中提交18个任务
int taskNum = 18;
for (int i = 0; i < taskNum; i++){
fixedThreadPool.execute(()->{
System.out.println(Thread.currentThread().getId() + "编号的任务在执行,开始时间:" + System.currentTimeMillis());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
/**
* 线程池的计划任务
*/
public class Test02 {
public static void main(String[] args) {
int threadNum = 10;
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(threadNum);
// 在延迟2秒后执行任务
scheduledExecutorService.schedule(()->{
System.out.println(Thread.currentThread().getId() + " -- " + System.currentTimeMillis());
}, 2, TimeUnit.SECONDS);
// 以固定的频率执行任务,开启任务的时间是固定的:在3秒后执行任务,以后每隔2秒重新执行一次
scheduledExecutorService.scheduleAtFixedRate(
()-> {
System.out.println(Thread.currentThread().getId() + " -- " + System.currentTimeMillis());
try {
// 如果任务执行时长超过了时间时隔,则任务完成后立即开启下一个任务
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
},
3, 2, TimeUnit.SECONDS);
// 在上次任务结束后,在固定延迟后再次执行该任务,不管执行任务耗时多长,总是在任务结束后的固定时间间隔后开启
scheduledExecutorService.scheduleWithFixedDelay(
()-> {
System.out.println(Thread.currentThread().getId() + " -- " + System.currentTimeMillis());
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
},
3, 2, TimeUnit.SECONDS);
}
}
6.4.3 核心线程池的底层实现
Executors工具类中返回线程池的方法都使用了ThreadPoolExecutor线程池,都是ThreadPoolExecutor线程池的封装。
ThreadPoolExecutor的构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:指定线程池中核心线程的数量;
- maximumPoolSize:指定线程池中最大线程数量;
- keepAliveTime:当前线程池的数量超过corePoolSize时,多余的空闲线程的存活时长,即空闲线程在多长时长内销毁;
- unit:keepAliveTime时长单位;
- workQueue:任务队列,把任务提交到该任务队列中等待执行;
- 直接提交队列,由SynchronousQueue对象提供,该队列没有容量,提交给线程池的任务不会被真实保存,总是将新任务提交给线程。如果没有空闲线程,则尝试创建新线程;如果线程数量已经达到maximumPoolSize规定的最大值则执行拒绝策略。该线程池在极端的情况下,每次提交新的任务都会创建新的线程执行,适合用来执行大量耗时短并且提交频繁的任务。
- 有界任务队列:由ArrayBlockingQueue实现,在创建ArrayBlockingQueue对象时,需要指定一个容量。当有任务需要执行时,如果线程池中线程数小于corePoolSize核心线程数,则创建新的线程;如果大于corePoolSize核心线程数,则加入等待队列;如果队列已满,则无法加入;在线程数小于maxinumPoolSize指定的最大线程数前提下,会创建新的线程来执行;如果线程数大于maximumPoolSize最大线程数,则执行拒绝策略。
- 无界任务队列:由LinkedBlockingQueue实现,如果不指定容量,与有界队列相比,除非系统资源耗尽,否则无界队列不存在任务入队失败的情况。当有新任务时,在系统线程数小于corePoolSize核心线程数,则创建新的线程来执行任务;当线程池中线程数量大于corePoolSize核心线程,则把任务加入阻塞队列。
- 优先任务队列:由PriorityBlockingQueue实现,是带有任务优先级的队列,是一个特殊的无界队列。不管是ArrayBlockingQueue,还是LinkedBlockingQueue都是按照先进先出算法处理任务的,在PriorityBlockingQueue队列中可以根据任务优先级顺序先后执行。
- threadFactory:用于创建线程;
handler:拒绝策略,当任务太多来不及处理时,如何拒绝。
6.4.4 拒绝策略
当提交给线程池的任务超过实际承载能力时,即线程池中的线程已经用完时,等待队列也满了,无法为新提交的任务服务,可以通过拒绝策略来处理这个问题,JDK提供了四种拒绝策略:
AbortPolicy策略(默认策略)会抛出异常;
- CallerRunPolicy策略只要线程池没有关闭,会在调用者线程中运行当前被丢弃的任务;
- DiscardOldestPolicy策略会将任务队列中最老的任务丢弃,尝试再次提交新任务;
- DiscardPolicy直接丢弃这个无法处理的任务。
如果内置的拒绝策略无法满足实际需求,可以扩展RejectedExecutionHandler接口:
public class Test03 {
public static void main(String[] args) {
// 定义任务
Runnable r = () -> {
int num = new Random().nextInt(5);
System.out.println(Thread.currentThread().getId()
+ " -- " + System.currentTimeMillis() + "开始睡眠" + num + "秒");
try {
TimeUnit.SECONDS.sleep(num);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
// 创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(), (r1, executor) -> {
// r就是请求的任务,executor就是当前线程池
System.out.println(r1 + " is discarding");
});
// 向线程池提交若干任务
for (int i = 0; i < Integer.MAX_VALUE; i++) {
threadPoolExecutor.submit(r);
}
}
}
6.4.5 ThreadFactory
线程池中的线程从ThreadFactory被创建,ThreadFactory是一个接口,只有一个用来创建线程的方法:Thread newThread(Runnable r)
。
public class Test04 {
public static void main(String[] args) {
// 定义任务
Runnable r = () -> {
int num = new Random().nextInt(5);
System.out.println(Thread.currentThread().getId()
+ " -- " + System.currentTimeMillis() + "开始睡眠" + num + "秒");
try {
TimeUnit.SECONDS.sleep(num);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
// 创建线程池,使用自定义线程工厂
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0,
TimeUnit.SECONDS, new SynchronousQueue<>(), r1 -> {
Thread thread = new Thread(r1);
// 设置为守护线程,当主线程运行结束,线程池中的线程会自动退出
thread.setDaemon(true);
System.out.println("创建了线程:" + thread);
return thread;
});
// 向线程池提交若干任务,当任务大于5时,线程池执行默认的拒绝策略,抛出异常
int threadNum = 5;
for (int i = 0; i < threadNum; i++) {
threadPoolExecutor.submit(r);
}
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
6.4.6 监控线程池
ThreadPoolExecutor提供了一组方法用于监控线程池:
- int getActiveCount():获得线程池中当前线程的数量;
- long getCompletedTaskCount():返回线程池完成任务的数量;
- int getCorePoolSize():返回核心线程数量;
- int getLargestPoolSize():返回线程池曾经达到的线程的最大数;
- int getMaximumPoolSize():返回线程池的最大容量;
- int getPoolSize():当前线程池的大小;
- BlockingQueue getQueue():返回阻塞队列;
long getTaskCount():返回线程池收到的任务总数。
6.4.7 扩展线程池
ThreadPoolExecutor线程池提供了两个方法:
protected void afterExecute(Runnable r, Throwable t);
- protected void beforeExecute(Thread t, Runnable r);
在线程池执行某个任务前会调用beforeExecute()方法,在任务结束会(任务异常退出)会执行afterExecute()方法。在ThreadPoolExecutor类中定义了一个内部类Worker,ThreadPoolExecutor线程池中工作线程就是Worker类的实例,Worker实例在执行时也会调用beforeExecute()与afterExecute()方法。
public class Test05 {
private static class MyTask implements Runnable {
private String name;
public MyTask(String name){
this.name = name;
}
@Override
public void run() {
System.out.println(name + "任务正在被线程" + Thread.currentThread().getId() + "执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
// 定义扩展线程池,可以定义线程池继承ThreadPoolExecutor,在子类中重写beforeExecute()/afterExecute()方法
// 也可以直接使用ThreadPoolExecutor的内部类
ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory()){
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println(t.getId() + "线程准备执行任务:" + ((MyTask)r).name);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println(((MyTask)r).name + "任务执行完毕");
}
@Override
protected void terminated() {
System.out.println("线程池退出");
}
};
int taskNum = 5;
for (int i = 0; i < taskNum; i++) {
MyTask task = new MyTask("task-" + i);
executorService.execute(task);
}
// 关闭线程池,仅仅是说不再接收新的任务
executorService.shutdown();
}
}
6.4.8 优化线程池数量
线程池大小对系统性能有一定影响,过大或者过小都会无法发挥最优的系统
性能,线程池大小不需要非常精确,只要避免极大或者极小的情况即可。一般来说,线程池大小需要考虑CPU数量、内存大小等因素,在《Java Concurrent in Practice》书中给出一个估算线程池大小公式:
线程池大小 = CPU的数量 目标CPU的使用率 (1 + 等待时间与计算时间的比)
6.4.9 线程池死锁
如果在线程池中执行的任务A在执行过程中又向线程池提交了任务B,任务B添加了线程池的等待队列中,如果任务A的结束需要等待任务B的执行结果,就有可能会出现这种情况。
线程池适合提交相互独立的任务,而不是相互依赖的任务。
6.4.10 线程池的异常处理
线程池运行过程,线程出现异常却未输出错误信息,如线程运行出现零除异常,解决方法如下:
- 使用execute(),而不是submit()。
对ThreadPoolExecutor进行扩展,对运行的任务进行包装。将任务放入新方法wrap()返回的Runnable接口的run()方法的try-catch块中,在catch中重新抛出异常,然后重写submit()方法,将wrap()接口返回的Runnable接口传入submit()方法中。
6.4.11 ForkJoinPool线程池
“分而治之”是一个有效的处理大数据的方法,著名的MapReduce就是采用这种思路。系统对ForkJoinPool线程池进行了优化,把一个大任务调用fork()方法分解为若干个小的任务,把小任务的处理结果进行join()合并为大任务的结果。提交的任务数量与线程的数量不一定是一对一关系,在多数情况下,一个物理线程实际上需要处理多个逻辑任务。ForkJoinPool线程池中最常用的方法是:
ForkJoinTask submit(ForkJoinTask task)向线程池提交一个ForkJoinTask任务,ForkJoinTask任务支持fork()分解与join()等待的任务。ForkJoinTask有两个重要的子类:RecursiveAction和RecursiveTask,它们的区别在于RecursiveAction任务没有返回值,RecursiveTask任务可以带有返回值。 public class Test06 {
/**
* 计算数列的和,需要返回结果,可以定义任务继承RecursiveTask
*/
private static class CountTask extends RecursiveTask<Long> {
/**
* 当个任务量阀值
*/
private static final int THRESHOLD = 10000;
/**
* 分解任务数
*/
private static final int TASK_NUM = 100;
private long start;
private long end;
public CountTask(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
if (end - start < THRESHOLD) {
for (long i = start; i <= end; i++) {
sum += i;
}
} else {
// 拆分任务
long step = (start + end) / 100;
ArrayList<CountTask> subTaskList = new ArrayList<>();
long pos = start;
for (int i = 0; i < TASK_NUM; i++) {
long lastOne = pos + step;
if (lastOne > end) {
lastOne = end;
}
CountTask task = new CountTask(pos, lastOne);
subTaskList.add(task);
// 调用fork()提交子任务
task.fork();
pos += step + 1;
}
// 合并结果
for (CountTask task :
subTaskList) {
sum += task.join();
}
}
return sum;
}
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
CountTask task = new CountTask(0L, 200000L);
ForkJoinTask<Long> result = forkJoinPool.submit(task);
Long res = null;
try {
res = result.get();
System.out.println("计算数列结果为:" + res);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}