- 1.3 创建线程的4种方法
- 1.4 线程的核心原理
- 1.5 线程的基本操作
- 1.6 线程池原理与实战
- 1.6.1 JUC的线程池架构
- 1.6.2 Executors的4种快捷创建线程池的方法
- 1.6.3 线程池的标准创建方式
- 1.6.4 向线程池提交任务的两种方式
- 1.6.5 线程池的任务调度流程
- 1.6.6 ThreadFactory(线程工厂)
- 1.6.7 任务阻塞队列
- 1.6.8 调度器的钩子方法
- 1.6.9 线程池的拒绝策略
- 1.6.10 线程池的优雅关闭
- 1.6.11 Executors快捷创建线程池的潜在问题
- 1.7.3 为CPU密集型任务确定线程数
- 1.7.4 为混合型任务确定线程数
- 1.8 ThreadLocal原理与实战
- 1.8.1 ThreadLocal的基本使用
- 1.8.2 ThreadLocal的使用场景
- 1.8.3 使用ThreadLocal进行线程隔离
- 1.8.5 ThreadLocal内部结构演进
- 1.8.6 ThreadLocal源码分析
- 1.8.7 ThreadLocalMap源码分析
- 1.8.8 ThreadLocal综合使用案例
1.3 创建线程的4种方法
1.3.1 Thread类详解
- 线程ID
属性:private long tid,此属性用于保存线程的ID。这是一个private类型的属性,外部只能使用getId()方法访问线程的ID。
方法:public long getId(),获取线程ID,线程ID由JVM进行管理,在进程内唯一。比如,1.2节的实例中,所输出的main线程的ID为1。
- 线程名称
属性:private String name,该属性保存一个Thread线程实例的名字。
方法一:public final String getName(),获取线程名称。
方法二:public final void setName(String name),设置线程名称。
方法三:Thread(String threadName),通过此构造方法给线程设置一个定制化的名字。
- 线程优先级
属性:private int priority,保存一个Thread线程实例的优先级。
方法一:public final int getPriority(),获取线程优先级。
方法二:public final void setPriority(int priority),设置线程优先级。
Java线程的最大优先级值为10,最小值为1,默认值为5。这三个优先级值为三个常量值,在Thread类中使用类常量定义,三个类常量如下:
public static final int MIN_PRIORITY = 1;
public static final int NORM_PRIORITY = 5;
public static final int MAX_PRIORITY = 10;
- 是否为守护线程
属性:private boolean daemon=false,该属性保存Thread线程实例的守护状态,默认为false,表示是普通的用户线程,而不是守护线程。
方法:public final void setDaemon(boolean on),将线程实例标记为守护线程或用户线程,如果参数值为true,那么将线程实例标记为守护线程。
- 线程的状态
Thread的内部静态枚举类State用于定义Java线程的所有状态,具体如下: ```java public static enum State { NEW, //新建 RUNNABLE, //就绪、运行 BLOCKED, //阻塞 WAITING, //等待 TIMED_WAITING, //计时等待 TERMINATED; //结束 }private int threadStatus
public Thread.State getState() //返回表示当前线程的执行状态,为新建、就绪、运行、阻塞、结束等状态中的一种。
在Java线程的状态中,就绪状态和运行状态在内部用同一种状态RUNNABLE表示。就绪状态表示线程具备运行条件,正在等待获取CPU时间片;运行状态表示线程已经获取了CPU时间片,CPU正在执行线程代码逻辑。
6. 线程的启动和运行
```java
public void start()
public void run()
start()方法用于线程的启动,run()方法作为用户代码逻辑的执行入口。
取得当前线程
public static Thread currentThread()
-
1.3.2 创建一个空线程
第一个创建线程的方法是通过继承Thread类创建一个线程实例 ```java package com.crazymakercircle.multithread.basic.create; import personal.nien.javabook.util.Print; public class EmptyThreadDemo {
public static void main(String args[]) throws InterruptedException {
//使用Thread类创建和启动线程
Thread thread = new Thread();
Print.cfo("线程名称:"+thread.getName());
Print.cfo("线程ID:"+thread.getId());
Print.cfo("线程状态:"+thread.getState());
Print.cfo("线程优先级:"+thread.getPriority());
Print.cfo(getCurThreadName() + " 运行结束.");
thread.start();
}
}
在thread线程信息输出完成后,程序调用thread.start()实例方法启动新线程thread的执行。从上一小节大家知道,这时新线程的执行会调用Thread的run()实例方法,该方法作为用户业务代码逻辑的入口。
```java
public void run() {
if(this.target != null) {
this.target.run();
}
}
这里的target属性是Thread类的一个实例属性,该属性是很重要的,在后面会用到和讲到。在Thread类中,target的属性值默认为空。在这个例子中,thread线程的target属性默认为null。所以在thread线程执行时,其run()方法其实什么也没有做,线程就执行完了。
1.3.3 线程创建方法一:继承Thread类创建线程类
- 需要继承Thread类,创建一个新的线程类。
同时重写run()方法,将需要并发执行的业务代码编写在run()方法中。 ```java public class CreateDemo { public static final int MAX_TURN = 5; public static String getCurThreadName() {
return Thread.currentThread().getName();
}
static int threadNo = 1;
static class DemoThread extends Thread {
public DemoThread() {
super("DemoThread-" + threadNo++);
}
public void run() {
for (int i = 1; i < MAX_TURN; i++) {
Print.cfo(getName() + ", 轮次:" + i);
}
Print.cfo(getName() + " 运行结束.");
}
}
public static void main(String[] args) {
Thread thread = null;
for (int i = 0; i < 2; i++) {
thread = new DemoThread();
thread.start();
}
Print.cfo(getCurThreadName() + " 运行结束.");
} }
<a name="qBGBz"></a>
## 1.3.4 线程创建方法二:实现Runnable接口创建线程目标类
重温一下Thread类的run()方法的代码,里边其实有点玄机,其代码如下:
```java
package java.lang;
public class Thread implements Runnable {
...
private Runnable target; //执行目标
public void run() {
if(this.target != null) {
this.target.run(); //调用执行目标的run()方法
}
}
public Thread(Runnable target) { //包含执行目标的构造器
init(null, target, "Thread-" + nextThreadNum(), 0);
}
}
在Thread类的run()方法中,如果target(执行目标)不为空,就执行target属性的run()方法。而target属性是Thread类的一个实例属性,并且target属性的类型为Runnable。<br />Thread类的target属性在什么情况下非空呢?Thread类有一系列的构造器,其中有多个构造器可以为target属性赋值,这些构造器包括如下两个:
- public Thread(Runnable target)
- public Thread(Runnable target,String name)
- Runnable接口
Runnable是一个极为简单的接口,位于java.lang包中。接口中只有一个方法run(),具体的源代码如下:
package java.lang;
@FunctionalInterface
public interface Runnable {
void run();
}
- 通过实现Runnable接口创建线程类
创建线程的第二种方法就是实现Runnable接口,将需要异步执行的业务逻辑代码放在Runnable实现类的run()方法中,将Runnable实例作为target执行目标传入Thread实例。该方法的具体步骤如下:
- 定义一个新类实现Runnable接口。
- 实现Runnable接口中的run()抽象方法,将线程代码逻辑存放在该run()实现版本中。
- 通过Thread类创建线程对象,将Runnable实例作为实际参数传递给Thread类的构造器,由Thread构造器将该Runnable实例赋值给自己的target执行目标属性。
- 调用Thread实例的start()方法启动线程。
线程启动之后,线程的run()方法将被JVM执行,该run()方法将调用target属性的run()方法,从而完成Runnable实现类中业务代码逻辑的并发执行。
public class CreateDemo2 {
public static final int MAX_TURN = 5;
static int threadNo = 1;
static class RunTarget implements Runnable {
@Override
public void run() {
for (int j = 1; j < MAX_TURN; j++) {
Print.cfo(ThreadUtil.getCurThreadName() + ", 轮次:" + j);
}
Print.cfo(getCurThreadName() + " 运行结束.");
}
}
public static void main(String[] args) {
Thread thread = null;
for (int i = 0; i < 2; i++) {
RunTarget runTarget = new RunTarget();
thread = new Thread(runTarget, "RunnableThread" + threadNo++);
thread.start();
}
}
}
完成了Runnable的实现类后,需要调用Thread类的构造器创建线程,并将Runnable实现类的实例作为实参传入。可以调用的构造器(即构造函数)包括如下三个:
public Thread(Runnable target)
- public Thread(Runnable target,String name)
public Thread(ThreadGroup group,Runnable target)
1.3.5 优雅创建Runnable线程目标类的两种方式
通过匿名类优雅地创建Runnable线程目标类。
在实现Runnable编写target执行目标类时,如果target实现类是一次性类,可以使用匿名实例的形式。
package com.crazymakercircle.multithread.basic.create;
// 省略import
public class CreateDemo2 {
public static final int MAX_TURN = 5;
static int threadNo = 1;
public static void main(String args[]) throws InterruptedException {
Thread thread = null;
//使用Runnable的匿名类创建和启动线程
for (int i = 0; i < 2; i++) {
thread = new Thread(new Runnable() { //① 匿名实例
@Override
public void run() { //② 异步执行的业务逻辑
for (int j = 1; j < MAX_TURN; j++) {
Print.cfo(getCurThreadName() + ", 轮次:" + j);
}
Print.cfo(getCurThreadName() + " 运行结束.");
}
}, "RunnableThread" + threadNo++);
thread.start();
}
Print.cfo(getCurThreadName() + " 运行结束.");
}
}
使用Lambda表达式优雅地创建Runnable线程目标类。 ```java package com.crazymakercircle.multithread.basic.create; // 省略import public class CreateDemo2 {
public static final int MAX_TURN = 5;
static int threadNo = 1;
public static void main(String args[]) throws InterruptedException {
Thread thread = null;
//使用Lambda表达式形式创建和启动线程
for (int i = 0; i < 2; i++) {
thread = new Thread( ()-> { //①Lambda表达式
for (int j = 1; j < MAX_TURN; j++) {
Print.cfo(getCurThreadName() + ", 轮次:" + j);
}
Print.cfo(getCurThreadName() + " 运行结束.");
}, "RunnableThread" + threadNo++);
thread.start();
}
Print.cfo(getCurThreadName() + " 运行结束.");
}
}
<a name="QnN5E"></a>
## 1.3.6 通过实现Runnable接口的方式创建线程目标类的优缺点
通过实现Runnable接口的方式创建线程目标类有以下缺点:
1. 所创建的类并不是线程类,而是线程的target执行目标类,需要将其实例作为参数传入线程类的构造器,才能创建真正的线程。
1. 如果访问当前线程的属性(甚至控制当前线程),不能直接访问Thread的实例方法,必须通过Thread.currentThread()获取当前线程实例,才能访问和控制当前线程。
通过实现Runnable接口的方式创建线程目标类有以下优点:
1. 可以避免由于Java单继承带来的局限性。如果异步逻辑所在类已经继承了一个基类,就没有办法再继承Thread类。比如,当一个Dog类继承了Pet类,再要继承Thread类就不行了。所以在已经存在继承关系的情况下,只能使用实现Runnable接口的方式。
1. 逻辑和数据更好分离。通过实现Runnable接口的方法创建多线程更加适合同一个资源被多段业务逻辑并行处理的场景。在同一个资源被多个线程逻辑异步、并行处理的场景中,通过实现Runnable接口的方式设计多个target执行目标类可以更加方便、清晰地将执行逻辑和数据存储分离,更好地体现了面向对象的设计思想。
- “逻辑和数据更好地分离”演示实例
```java
public class SalesDemo {
public static final int MAX_AMOUNT = 5; //商品数量
//商店商品类(销售线程类),一个商品一个销售线程,每个线程异步销售4次
static class StoreGoods extends Thread {
StoreGoods(String name) {
super(name);
}
private int goodsAmount = MAX_AMOUNT;
public void run() {
for (int i = 0; i < MAX_AMOUNT; i++) {
if (this.goodsAmount > 0) {
Print.cfo(getCurThreadName() + " 卖出一件,还剩:" + (--goodsAmount));
sleepMilliSeconds(10);
}
}
Print.cfo(getCurThreadName() + " 运行结束.");
}
}
//商场商品类(target销售线程的目标类),一个商品最多销售4次,可以多人销售
static class MallGoods implements Runnable {
//多人销售可能导致数据出错,使用原子数据类型保障数据安全
private AtomicInteger goodsAmount = new AtomicInteger(MAX_AMOUNT);
public void run() {
for (int i = 0; i <= MAX_AMOUNT; i++) {
if (this.goodsAmount.get() > 0) {
Print.cfo(getCurThreadName() + " 卖出一件,还剩:"
+ (goodsAmount.decrementAndGet()));
sleepMilliSeconds(10);
}
}
Print.cfo(getCurThreadName() + " 运行结束.");
}
}
public static void main(String args[]) throws InterruptedException {
Print.hint("商店版本的销售");
for (int i = 1; i <= 2; i++) {
Thread thread = null;
thread = new StoreGoods("店员-" + i);
thread.start();
}
Thread.sleep(1000);
Print.hint("商场版本的销售");
MallGoods mallGoods = new MallGoods();
for (int i = 1; i <= 2; i++) {
Thread thread = null;
thread = new Thread(mallGoods, "商场销售员-" + i);
thread.start();
}
Print.cfo(getCurThreadName() + " 运行结束.");
}
}
- 通过继承Thread类实现多线程能更好地做到多个线程并发地完成各自的任务,访问各自的数据资源。
- 通过实现Runnable接口实现多线程能更好地做到多个线程并发地完成同一个任务,访问同一份数据资源。多个线程的代码逻辑可以方便地访问和处理同一个共享数据资源(如例子中的MallGoods.goodsAmount),这样可以将线程逻辑和业务数据进行有效的分离,更好地体现了面向对象的设计思想。
通过实现Runnable接口实现多线程时,如果数据资源存在多线程共享的情况,那么数据共享资源需要使用原子类型(而不是普通数据类型),或者需要进行线程的同步控制,以保证对共享数据操作时不会出现线程安全问题。
1.3.7 线程创建方法三:使用Callable和FutureTask创建线程
前面已经介绍了继承Thread类或者实现Runnable接口这两种方式来创建线程类,但是这两种方式有一个共同的缺陷:不能获取异步执行的结果。
为了解决异步执行的结果问题,Java语言在1.5版本之后提供了一种新的多线程创建方法:通过Callable接口和FutureTask类相结合创建线程。Callable接口 ```java package java.util.concurrent; @FunctionalInterface public interface Callable
{ V call() throws Exception;
}
2. RunnableFuture接口
```java
package java.util.concurrent;
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
RunnableFuture继承了Runnable接口,从而保证了其实例可以作为Thread线程实例的target目标;同时,RunnableFuture通过继承Future接口,保证了可以获取未来的异步执行结果。
- Future接口
- 能够取消异步执行中的任务。
- 判断异步任务是否执行完成。
- 获取异步任务完成后的执行结果。
```java
package java.util.concurrent;
public interface Future
{
}boolean cancel(boolean mayInterruptRunning); //取消异步执行
boolean isCancelled();
boolean isDone();//判断异步任务是否执行完成
//获取异步任务完成后的执行结果
V get() throws InterruptedException, ExecutionException;
//设置时限,获取异步任务完成后的执行结果
V get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException;
...
4. FutureTask类
FutureTask类才是真正的在Thread与Callable之间搭桥的类。
![image.png](https://cdn.nlark.com/yuque/0/2022/png/27178439/1649937555089-b0ce1cf5-3c99-4e57-8d35-9d5c6d62c4fc.png#clientId=ue9b87c13-8f16-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=441&id=u573dbd07&margin=%5Bobject%20Object%5D&name=image.png&originHeight=551&originWidth=575&originalType=binary&ratio=1&rotation=0&showTitle=false&size=86442&status=done&style=none&taskId=u4a270b74-8bbc-4498-bf2a-32c7555a843&title=&width=460)<br />FutureTask的outcome实例属性用于保存callable成员call()方法的异步执行结果。在FutureTask类的run()方法完成callable成员的call()方法的执行之后,其结果将被保存在outcome实例属性中,供FutureTask类的get()方法获取。
5. 使用Callable和FutureTask创建线程的具体步骤
1. 创建一个Callable接口的实现类,并实现其call()方法,编写好异步执行的具体逻辑,可以有返回值。
1. 使用Callable实现类的实例构造一个FutureTask实例。
1. 使用FutureTask实例作为Thread构造器的target入参,构造新的Thread线程实例。
1. 调用Thread实例的start()方法启动新线程,启动新线程的run()方法并发执行。其内部的执行过程为:启动Thread实例的run()方法并发执行后,会执行FutureTask实例的run()方法,最终会并发执行Callable实现类的call()方法。
1. 调用FutureTask对象的get()方法阻塞性地获得并发线程的执行结果。
```java
package com.crazymakercircle.multithread.basic.create;
// 省略import
public class CreateDemo3 {
public static final int MAX_TURN = 5;
public static final int COMPUTE_TIMES = 100000000;
//①创建一个 Callable 接口的实现类
static class ReturnableTask implements Callable<Long> {
//②编写好异步执行的具体逻辑,可以有返回值
public Long call() throws Exception{
long startTime = System.currentTimeMillis();
Print.cfo(getCurThreadName() + " 线程运行开始.");
Thread.sleep(1000);
for (int i = 0; i < COMPUTE_TIMES; i++) {
int j = i * 10000;
}
long used = System.currentTimeMillis() - startTime;
Print.cfo(getCurThreadName() + " 线程运行结束.");
return used;
}
}
public static void main(String args[]) throws InterruptedException {
ReturnableTask task=new ReturnableTask();//③
FutureTask<Long> futureTask = new FutureTask<Long>(task);//④
Thread thread = new Thread(futureTask, "returnableThread");//⑤
thread.start();//⑥
Thread.sleep(500);
Print.cfo(getCurThreadName() + " 让子弹飞一会儿.");
Print.cfo(getCurThreadName() + " 做一点自己的事情.");
for (int i = 0; i < COMPUTE_TIMES / 2; i++) { int j = i * 10000;
}
Print.cfo(getCurThreadName() + " 获取并发任务的执行结果.");
try {
Print.cfo(thread.getName()+"线程占用时间:"
+ futureTask.get());//⑦
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
Print.cfo(getCurThreadName() + " 运行结束.");
}
}
1.3.8 线程创建方法四:通过线程池创建线程
- 线程池的创建与执行目标提交
向ExecutorService线程池提交异步执行target目标任务的常用方法有: ```java //方法一:执行一个 Runnable类型的target执行目标实例,无返回 void execute(Runnable command);//创建一个包含三个线程的线程池
private static ExecutorService pool = Executors.newFixedThreadPool(3);
//方法二:提交一个 Callable类型的target执行目标实例, 返回一个Future异步任务实例
//方法三:提交一个 Runnable类型的target执行目标实例, 返回一个Future异步任务实例 Future<?> submit(Runnable task);
2. 线程池的使用实战
```java
public class CreateDemo4 {
public static final int MAX_TURN = 5;
public static final int COMPUTE_TIMES = 100000000;
//创建一个包含三个线程的线程池
private static ExecutorService pool = Executors.newFixedThreadPool(3);
static class DemoThread implements Runnable {
@Override
public void run() {
for (int j = 1; j < MAX_TURN; j++) {
Print.cfo(getCurThreadName() + ", 轮次:" + j);
sleepMilliSeconds(10);
}
}
}
static class ReturnableTask implements Callable<Long> {
@Override
public Long call() throws Exception {
long startTime = System.currentTimeMillis();
Print.cfo(getCurThreadName() + " 线程运行开始.");
for (int j = 1; j < MAX_TURN; j++) {
Print.cfo(getCurThreadName() + ", 轮次:" + j);
sleepMilliSeconds(10);
}
long used = System.currentTimeMillis() - startTime;
Print.cfo(getCurThreadName() + " 线程运行结束.");
return used;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
pool.execute(new DemoThread()); //执行线程实例,无返回
pool.execute(new Runnable() {
@Override
public void run() {
for (int j = 1; j < MAX_TURN; j++) {
Print.cfo(getCurThreadName() + ", 轮次:" + j);
sleepMilliSeconds(10);
}
}
});
//提交Callable 执行目标实例,有返回
Future future = pool.submit(new ReturnableTask());
Long result = (Long) future.get();
Print.cfo("异步任务的执行结果为:" + result);
}
}
ExecutorService线程池的execute(…)与submit(…)方法的区别如下。
- 接收的参数不一样
- submit()有返回值,而execute()没有
1.4 线程的核心原理
1.4.1 线程的调度与时间片
1.4.2 线程的优先级
在Thread类中有一个实例属性和两个实例方法,专门用于进行线程优先级相关的操作。 ```java private int priority;//该属性保存一个Thread实例的优先级,即1~10的值 public final int getPriority(); public final void setPriority(int priority);
Thread实例的priority属性默认是级别5,对应的类常量是NORM_PRIORITY。优先级最大值为10,最小值为1,Thread类中定义的三个优先级常量如下:
```java
public static final int MIN_PRIORITY = 1;
public static final int NORM_PRIORITY = 5;
public static final int MAX_PRIORITY = 10;
Java中使用抢占式调度模型进行线程调度。priority实例属性的优先级越高,线程获得CPU时间片的机会就越多,但也不是绝对的。举一个例子,顺便演示以上两个线程优先级实例方法的使用,具体如下:
package com.crazymakercircle.multithread.basic.create2;
import personal.nien.javabook.util.Print;
public class PriorityDemo {
public static final int SLEEP_GAP = 1000;
static class PrioritySetThread extends Thread {
static int threadNo = 1;
public PrioritySetThread() {
super("thread-" + threadNo);
threadNo++;
}
public long opportunities=0;
public void run() {
for (int i = 0; ; i++) {
opportunities++;
}
}
}
public static void main(String args[]) throws InterruptedException {
PrioritySetThread[] threads=new PrioritySetThread[10];
for (int i = 0; i < threads.length; i++) {
threads[i]=new PrioritySetThread();
//优先级的设置,1~10
threads[i].setPriority(i+1);
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();//启动线程
}
Thread.sleep(SLEEP_GAP); //等待线程运行1s
for (int i = 0; i < threads.length; i++) {
threads[i].stop(); //停止线程
}
for (int i = 0; i < threads.length; i++) {
Print.cfo(threads[i].getName()+
"-优先级为-"+threads[i].getPriority()+ //获取优先级
"-机会值为-"+threads[i].opportunities
);
}
}
}
- 整体而言,高优先级的线程获得的执行机会更多。从实例中可以看到:优先级在6级以上的线程和4级以下的线程执行机会明显偏多,整体对比非常明显
- 执行机会的获取具有随机性,优先级高的不一定获得的机会多。比如,例子中的thread-10比thread-9优先级高,但是thread-10所获得的机会反而偏少。
1.4.3 线程的生命周期
Java中线程的生命周期分为6种状态。Thread类有一个实例属性和一个实例方法专门用于保存和获取线程的状态。 ```java private int threadStatus;//以整数的形式保存线程的状态 public Thread.State getState(); //返回当前线程的执行状态,一个枚举类型值 public static enum State {
}NEW, //新建
RUNNABLE, //可执行:包含操作系统的就绪、运行两种状态
BLOCKED, //阻塞
WAITING, //等待
TIMED_WAITING, //限时等待
TERMINATED; //终止
1. NEW状态
创建成功但是没有调用start()方法启动的Thread线程实例都处于NEW状态。
2. RUNNABLE状态
NEW状态的Thread实例调用了start()方法后,线程的状态将变成RUNNABLE状态。尽管如此,线程的run()方法不一定会马上被并发执行,需要在线程获取了CPU时间片之后才真正启动并发执行。
3. TERMINATED状态
处于RUNNABLE状态的线程在run()方法执行完成之后就变成终止状态TERMINATED了。当然,如果在run()方法执行过程中发生了运行时异常而没有被捕获,run()方法将被异常终止,线程也会变成TERMINATED状态。
4. TIMED_WAITING状态
线程处于一种特殊的等待状态,准确地说,线程处于限时等待状态。能让线程处于限时等待状态的操作大致有以下几种:<br />a. Thread.sleep(int n):使得当前线程进入限时等待状态,等待时间为n毫秒。<br />b. Object.wait():带时限的抢占对象的monitor锁。<br />c. Thread.join():带时限的线程合并。<br />d. LockSupport.parkNanos():让线程等待,时间以纳秒为单位。<br />e. LockSupport.parkUntil():让线程等待,时间可以灵活设置。
<a name="iiDb5"></a>
## 1.4.4 一个线程状态的简单演示案例
```java
public class StatusDemo {
//每个线程执行的轮次
public static final long MAX_TURN = 5;
//线程编号
static int threadSeqNumber = 0;
//全局的静态线程列表
static List<Thread> threadList = new ArrayList<>();
//输出静态线程列表中每个线程的状态
private static void printThreadStatus() {
for (Thread thread : threadList) {
Print.tco(thread.getName() + " 状态为 " + thread.getState());
}
}
//向全局的静态线程列表加入线程
private static void addStatusThread(Thread thread) {
threadList.add(thread);
}
static class StatusDemoThread extends Thread {
public StatusDemoThread() {
super("statusPrintThread" + (++threadSeqNumber));
//将自己加入全局的静态线程列表
addStatusThread(this);
}
public void run() {
Print.cfo(getName() + ", 状态为" + getState());
for (int turn = 0; turn < MAX_TURN; turn++) {
//线程睡眠
sleepMilliSeconds(500);
//输出所有线程的状态
printThreadStatus();
}
Print.tco(getName() + "- 运行结束.");
}
}
public static void main(String args[]) throws InterruptedException {
//将main线程加入全局列表
addStatusThread(Thread.currentThread());
//新建三个线程,这些线程在构造器中会将自己加入全局列表
Thread sThread1 = new StatusDemoThread();
Print.cfo(sThread1.getName() + "- 状态为" + sThread1.getState());
Thread sThread2 = new StatusDemoThread();
Print.cfo(sThread2.getName() + "- 状态为" + sThread2.getState());
Thread sThread3 = new StatusDemoThread();
Print.cfo(sThread3.getName() + "- 状态为" + sThread3.getState());
sThread1.start(); //启动第一个线程
sleepMilliSeconds(500); //等待500毫秒启动第二个线程
sThread2.start();
sleepMilliSeconds(500); //等待1000毫秒启动第三个线程
sThread3.start();
sleepSeconds(100); //睡眠100秒
}
}
对于示例中调用的sleepMilliSeconds()方法,它的内部调用了LockSupport.parkNanos(…)方法使得当前线程限时等待,这是为了编程快捷而自定义的方法,其代码如下:
package com.crazymakercircle.util;
// 省略import
public class ThreadUtil
{
public static void sleepMilliSeconds(int millisecond)
{
LockSupport.parkNanos(millisecond * 1000L * 1000L);
}
// 省略其他方法
}
1.4.5 使用Jstack工具查看线程状态
1.5 线程的基本操作
1.5.1 线程名称的设置和获取
public class ThreadNameDemo {
private static final int MAX_TURN = 3;
//异步执行目标类
static class RunTarget implements Runnable { // 实现Runnable接口
public void run() { // 重新run()方法
for (int turn = 0; turn < MAX_TURN; turn++) {
sleepMilliSeconds(500);//线程睡眠
Print.tco("线程执行轮次:" + turn);
}
}
}
public static void main(String args[]) {
RunTarget target = new RunTarget(); // 实例化Runnable异步执行目标类
new Thread(target).start(); // 系统自动设置线程名称
new Thread(target).start(); // 系统自动命令线程名称
new Thread(target).start(); // 系统自动命令线程名称
new Thread(target, "手动命名线程-A").start(); // 手动设置线程名称
new Thread(target, "手动命名线程-B").start(); // 手动设置线程名称
sleepSeconds(Integer.MAX_VALUE); //主线程不能结束
}
}
Print.tco()方法的代码如下
package com.crazymakercircle.util;
public class Print
{
/**
* 在正式输出的内容前输出线程的名称
* @param s 待输出的字符串
*/
public static void tco(Object s)
{
String cft = "[" + Thread.currentThread().getName() + "]" + ":" + s;
//提交线程池进行异步输出,使得输出过程不影响当前线程的执行
//异步输出的好处:不会造成输出乱序,也不会造成当前线程阻塞
ThreadUtil.execute(() ->
{
synchronized (System.out)
{
System.out.println(cft);
}
});
}
//省去不相干的代码
}
1.5.2 线程的sleep操作
Sleep()方法定义在Thread类中,是一组静态方法,有两个重载版本:
//使目前正在执行的线程休眠millis毫秒
public static void sleep(long millis) throws InterruptException;
//使目前正在执行的线程休眠millis毫秒,nanos纳秒
public static void sleep(long millis,int nanos) throws InterruptException;
举一个例子演示一下sleep()静态方法的调用,具体如下:
public class SleepDemo {
public static final int SLEEP_GAP = 5000;//睡眠时长
public static final int MAX_TURN = 50;//睡眠次数
static class SleepThread extends Thread {
static int threadSeqNumber = 1;
public SleepThread() {
super("sleepThread-" + threadSeqNumber);
threadSeqNumber++;
}
public void run() {
try {
for (int i = 1; i < MAX_TURN; i++) {
Print.tco(getName() + ", 睡眠轮次:" + i);
// 线程睡眠一会
Thread.sleep(SLEEP_GAP);
}
} catch (InterruptedException e) {
Print.tco(getName() + " 发生异常被中断.");
}
Print.tco(getName() + " 运行结束.");
}
}
public static void main(String args[]) throws InterruptedException {
for (int i = 0; i < 5; i++) {
Thread thread = new SleepThread();
thread.start();
}
Print.tco(getCurThreadName() + " 运行结束.");
}
}
C:\Users\user>jps
8468 Jps
18024 SleepDemo
C:\Users\user>jstack 18024
// 省略不相干的输出
"sleepThread-4" #17 prio=5 os_prio=0 tid=0x000000001fd21800 nid=0x462c waiting on condition [0x0000000020cbf000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at ...SleepDemo$SleepThread.run(SleepDemo.java:35)
"sleepThread-3" #16 prio=5 os_prio=0 tid=0x000000001fd1e800 nid=0x28a4 waiting on condition [0x0000000020bbf000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at ...SleepDemo$SleepThread.run(SleepDemo.java:35)
"sleepThread-2" #15 prio=5 os_prio=0 tid=0x000000001fd1e000 nid=0x1264 waiting on condition [0x0000000020abf000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at ...SleepDemo$SleepThread.run(SleepDemo.java:35)
"sleepThread-1" #14 prio=5 os_prio=0 tid=0x000000001fd29000 nid=0x1914 waiting on condition [0x00000000209be000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at ...SleepDemo$SleepThread.run(SleepDemo.java:35)
1.5.3 线程的interrupt操作
一个线程什么时候可以退出呢?当然只有线程自己才能知道。所以,这里介绍一下Thread的interrupt()方法,此方法本质不是用来中断一个线程,而是将线程设置为中断状态。
public class InterruptDemo {
public static final int SLEEP_GAP = 5000;//睡眠时长
public static final int MAX_TURN = 50;//睡眠次数
static class SleepThread extends Thread {
static int threadSeqNumber = 1;
public SleepThread() {
super("sleepThread-" + threadSeqNumber);
threadSeqNumber++;
}
public void run() {
try {
Print.tco(getName() + " 进入睡眠.");
// 线程睡眠一会
Thread.sleep(SLEEP_GAP);
} catch (InterruptedException e) {
e.printStackTrace();
Print.tco(getName() + " 发生被异常打断.");
return;
}
Print.tco(getName() + " 运行结束.");
}
}
public static void main(String args[]) throws InterruptedException {
Thread thread1 = new SleepThread();
thread1.start();
Thread thread2 = new SleepThread();
thread2.start();
sleepSeconds(2);//等待2秒
thread1.interrupt(); //打断线程1
sleepSeconds(5);//等待5秒
thread2.interrupt(); //打断线程2,此时线程2已经终止
sleepSeconds(1);//等待1秒
Print.tco("程序运行结束.");
}
}
下面的示例程序演示如何调用isInterrupted()实例方法监视线程的中断状态,如果发现线程被中断,就进行相应的处理,具体的代码如下:
package com.crazymakercircle.multithread.basic.use;
// 省略import
public class InterruptDemo
{
//测试用例:获取异步调用的结果
@Test
public void testInterrupted2()
{
Thread thread = new Thread()
{
public void run()
{
Print.tco("线程启动了");
//一直循环
while (true)
{
Print.tco(isInterrupted());
sleepMilliSeconds(5000);
//如果线程被中断,就退出死循环
if (isInterrupted())
{
Print.tco("线程结束了");
return;
}
}
}
};
thread.start();
sleepSeconds(2); //等待2秒
thread.interrupt(); //中断线程
sleepSeconds(2); //等待2秒
thread.interrupt();
}
}
1.5.4 线程的join操作
- 线程的join操作的三个版本 ```java //重载版本1:此方法会把当前线程变为TIMED_WAITING,直到被合并线程执行结束 public final void join() throws InterruptedException:
//重载版本2:此方法会把当前线程变为TIMED_WAITING,直到被合并线程执行结束,或者等待被合并线程执行millis的时间 public final synchronized void join(long millis) throws InterruptedException:
//重载版本3:此方法会把当前线程变为TIMED_WAITING,直到被合并线程执行结束,或者等待被合并线程执行millis+nanos的时间 public final synchroinzed void join(long millis, int nanos) throws InterruptedException:
2. 线程的join操作的演示实例
```java
package com.crazymakercircle.multithread.basic.use;
// 省略import
public class JoinDemo
{
public static final int SLEEP_GAP = 5000;//睡眠时长
public static final int MAX_TURN = 50;//睡眠次数
static class SleepThread extends Thread
{
// 省略SleepThread的代码,执行时睡眠5秒
// 具体代码与上一小节的SleepThread内部类相同,也可以参考随书源码
}
public static void main(String args[])
{
Thread thread1 = new SleepThread();
Print.tco("启动 thread1.");
thread1.start();
try
{
thread1.join();//合并线程1,不限时
} catch (InterruptedException e)
{
e.printStackTrace();
}
Print.tco("启动 thread2.");
//启动第二条线程,并且进行限时合并,等待时间为1秒
Thread thread2 = new SleepThread();
thread2.start();
try
{
thread2.join(1000);//限时合并,限时1秒
} catch (InterruptedException e)
{
e.printStackTrace();
}
Print.tco("线程运行结束.");
}
}
- join线程的WAITING状态
-
1.5.5 线程的yield操作
线程的yield(让步)操作的作用是让目前正在执行的线程放弃当前的执行,让出CPU的执行权限,使得CPU去执行其他的线程。
yield()方法是Thread类提供的一个静态方法,它可以让当前正在执行的线程暂停,但它不会阻塞该线程,只是让线程转入就绪状态。yield只是让当前线程暂停一下,让系统的线程调度器重新调度一次,yield()方法只有一个版本: ```java package com.crazymakercircle.multithread.basic.use; // 省略import public class YieldDemo {public static final int MAX_TURN = 100;//执行次数
public static AtomicInteger index = new AtomicInteger(0);//执行编号
// 记录线程的执行次数
private static Map<String, AtomicInteger> metric = new HashMap<>();
//输出线程的执行次数
private static void printMetric()
{
Print.tco("metric = " + metric);
}
static class YieldThread extends Thread
{
static int threadSeqNumber = 1;
public YieldThread()
{
super("sleepThread-" + threadSeqNumber);
threadSeqNumber++;
//将线程加入执行次数统计map
metric.put(this.getName(), new AtomicInteger(0));
}
public void run()
{
for (int i = 1; i < MAX_TURN && index.get() < MAX_TURN; i++)
{
Print.tco("线程优先级:" + getPriority());
index.incrementAndGet();
//统计一次
metric.get(this.getName()).incrementAndGet();
if (i % 2 == 0)
{
//让步:出让执行的权限
Thread.yield();
}
}
//输出所有线程的执行次数
printMetric();
Print.tco(getName() + " 运行结束.");
}
}
@Test
public void test()
{
Thread thread1 = new YieldThread();
//设置为最高的优先级
thread1.setPriority(Thread.MAX_PRIORITY);
Thread thread2 = new YieldThread();
//设置为最低的优先级
thread2.setPriority(Thread.MIN_PRIORITY);
Print.tco("启动线程.");
thread1.start();
thread2.start();
sleepSeconds(100);
}
}
总结起来,Thread.yeid()方法有以下特点:<br />(1)yield仅能使一个线程从运行状态转到就绪状态,而不是阻塞状态。<br />(2)yield不能保证使得当前正在运行的线程迅速转换到就绪状态。<br />(3)即使完成了迅速切换,系统通过线程调度机制从所有就绪线程中挑选下一个执行线程时,就绪的线程有可能被选中,也有可能不被选中,其调度的过程受到其他因素(如优先级)的影响。
<a name="Y3vdO"></a>
## 1.5.6 线程的daemon操作
1. 守护线程的基本操作
1. 守护线程的基本操作演示实例
```java
package com.crazymakercircle.multithread.basic.use;
// 省略import
public class DaemonDemo
{
public static final int SLEEP_GAP = 500; //每一轮的睡眠时长
public static final int MAX_TURN = 4; //用户线程执行轮次
//守护线程实现类
static class DaemonThread extends Thread
{
public DaemonThread()
{
super("daemonThread");
}
public void run()
{
Print.synTco("--daemon线程开始.");
for (int i = 1; ; i++) //死循环
{
Print.synTco("--轮次:" + i);
Print.synTco("--守护状态为:" + isDaemon());
// 线程睡眠一会,500毫秒
sleepMilliSeconds(SLEEP_GAP);
}
}
}
public static void main(String args[]) throws InterruptedException
{
Thread daemonThread = new DaemonThread();
daemonThread.setDaemon(true);
daemonThread.start();
//创建一条用户线程,执行4轮
Thread userThread = new Thread(() ->
{
Print.synTco(">>用户线程开始.");
for (int i = 1; i <= MAX_TURN; i++)
{
Print.synTco(">>轮次:" + i);
Print.synTco(">>守护状态为:" + getCurThread().isDaemon());
sleepMilliSeconds(SLEEP_GAP);
}
Print.synTco(">>用户线程结束.");
}, "userThread");
//启动用户线程
userThread.start();
Print.synTco(" 守护状态为:" + getCurThread().isDaemon());
Print.synTco(" 运行结束.");
}
}
- 守护线程与用户线程的关系
从是否为守护线程的角度,对Java线程进行分类,分为用户线程和守护线程。守护线程和用户线程的本质区别是:二者与JVM虚拟机进程终止的方向不同。用户线程和JVM进程是主动关系,如果用户线程全部终止,JVM虚拟机进程也随之终止;守护线程和JVM进程是被动关系,如果JVM进程终止,所有的守护线程也随之终止,
- 守护线程的要点
- 守护线程必须在启动前将其守护状态设置为true,启动之后不能再将用户线程设置为守护线程,否则JVM会抛出一个InterruptedException异常。
- 守护线程存在被JVM强行终止的风险,所以在守护线程中尽量不去访问系统资源,如文件句柄、数据库连接等。
- 守护线程创建的线程也是守护线程。
Executor是Java异步目标任务的“执行者”接口,其目标是执行目标任务。“执行者”Executor提供了execute()接口来执行已提交的Runnable执行目标实例。Executor作为执行者的角色,其目的是提供一种将“任务提交者”与“任务执行者”分离开来的机制。
- ExecutorService
ExecutorService继承于Executor。它是Java异步目标任务的“执行者服务接”口,对外提供异步任务的接收服务。ExecutorService提供了“接收异步任务并转交给执行者”的方法,如submit系列方法、invoke系列方法等,具体如下:
//向线程池提交单个异步任务
<T> Future<T> submit(Callable<T> task);
//向线程池提交批量异步任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
- AbstractExecutorService
AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。AbstractExecutorService存在的目的是为ExecutorService中的接口提供默认实现。
- ThreadPoolExecutor
ThreadPoolExecutor就是大名鼎鼎的“线程池”实现类,它继承于AbstractExecutorService抽象类。
- ScheduledExecutorService
ScheduledExecutorService是一个接口,它继承于ExecutorService。它是一个可以完成“延时”和“周期性”任务的调度线程池接口,其功能和Timer/TimerTask类似。
- ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor继承于ThreadPoolExecutor,它提供了ScheduledExecutorService线程池接口中“延时执行”和“周期执行”等抽象调度方法的具体实现。
- Executors
Executors是一个静态工厂类,它通过静态工厂方法返回ExecutorService、ScheduledExecutorService等线程池示例对象,这些静态工厂方法可以理解为一些快捷的创建线程池的方法。
1.6.2 Executors的4种快捷创建线程池的方法
- newSingleThreadExecutor创建“单线程化线程池”
该方法用于创建一个“单线程化线程池”,也就是只有一个线程的线程池,所创建的线程池用唯一的工作线程来执行任务,使用此方法创建的线程池能保证所有任务按照指定顺序(如FIFO)执行。
package com.crazymakercircle.multithread.basic.create3;
// 省略import
public class CreateThreadPoolDemo
{
public static final int SLEEP_GAP = 500;
//异步任务的执行目标类
static class TargetTask implements Runnable
{
static AtomicInteger taskNo = new AtomicInteger(1);
private String taskName;
public TargetTask()
{
taskName = "task-" + taskNo.get();
taskNo.incrementAndGet();
}
public void run()
{
Print.tco("任务:" + taskName + " doing");
// 线程睡眠一会
sleepMilliSeconds(SLEEP_GAP);
Print.tco(taskName + " 运行结束.");
}
}
//测试用例:只有一个线程的线程池
@Test
public void testSingleThreadExecutor()
{
ExecutorService pool = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++)
{
pool.execute(new TargetTask());
pool.submit(new TargetTask());
}
sleepSeconds(1000);
//关闭线程池
pool.shutdown();
}
}
- newFixedThreadPool创建“固定数量的线程池”
```java
package com.crazymakercircle.multithread.basic.create3;
// 省略import
public class CreateThreadPoolDemo
{
}public static final int SLEEP_GAP = 500;
//异步任务的执行目标类
static class TargetTask implements Runnable
{
//为了节约篇幅,省略重复内容
}
//测试用例:只有3个线程固定大小的线程池
@Test
public void testNewFixedThreadPool()
{
ExecutorService pool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++)
{
pool.execute(new TargetTask());
pool.submit(new TargetTask());
}
sleepSeconds(1000);
//关闭线程池
pool.shutdown();
}
// 省略其他
3. newCachedThreadPool创建“可缓存线程池”
该方法用于创建一个“可缓存线程池”,如果线程池内的某些线程无事可干成为空闲线程,“可缓存线程池”可灵活回收这些空闲线程。
```java
package com.crazymakercircle.multithread.basic.create3;
// 省略import
public class CreateThreadPoolDemo
{
public static final int SLEEP_GAP = 500;
//异步任务的执行目标类
static class TargetTask implements Runnable
{
//为了节约篇幅,省略重复内容
}
//测试用例:“可缓存线程池”
@Test
public void testNewCacheThreadPool()
{
ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++)
{
pool.execute(new TargetTask());
pool.submit(new TargetTask());
}
sleepSeconds(1000);
//关闭线程池
pool.shutdown();
}
// 省略其他
}
- newScheduledThreadPool创建“可调度线程池”
该方法用于创建一个“可调度线程池”,即一个提供“延时”和“周期性”任务调度功能的ScheduledExecutorService类型的线程池。Executors提供了多个创建“可调度线程池”的工厂方法
//方法一:创建一个可调度线程池,池内仅含有一个线程
public static ScheduledExecutorService newSingleThreadScheduledExecutor();
//方法二:创建一个可调度线程池,池内含有N个线程,N的值为输入参数corePoolSize
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) ;
package com.crazymakercircle.multithread.basic.create3;
// 省略import
public class CreateThreadPoolDemo
{
public static final int SLEEP_GAP = 500;
//异步任务的执行目标类
static class TargetTask implements Runnable
{
//为了节约篇幅,省略重复内容
}
//测试用例:“可调度线程池”
@Test
public void testNewScheduledThreadPool()
{
ScheduledExecutorService scheduled =
Executors.newScheduledThreadPool(2);
for (int i = 0; i < 2; i++)
{
scheduled.scheduleAtFixedRate(new TargetTask(),
0, 500, TimeUnit.MILLISECONDS);
// 以上参数中:0表示首次执行任务的延迟时间,500表示每次执行任务的间隔时间
// TimeUnit.MILLISECONDS是执行的时间间隔数值,单位为毫秒
}
sleepSeconds(1000);
//关闭线程池
scheduled.shutdown();
}
// 省略其他
}
ScheduleExecutorService中接收被调目标任务的方法之一scheduleAtFixedRate的定义如下:
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, //异步任务target执行目标实例
long initialDelay, //首次执行延时
long period, //两次开始执行最小间隔时间
TimeUnit unit //所设置的时间的计时单位,如TimeUnit.SECONDS常量
);
ScheduleExecutorService中接收被调目标任务的方法之二scheduleWithFixedDelay的定义如下:
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, //异步任务target执行目标实例
long initialDelay, //首次执行延时
long delay, //前一次执行结束到下一次执行开始的间隔时间(间隔执行延迟时间)
TimeUnit unit //所设置的时间的计时单位,如TimeUnit.SECONDS常量
);
1.6.3 线程池的标准创建方式
大部分企业的开发规范都会禁止使用快捷线程池(具体原因稍后介绍),要求通过标准构造器ThreadPoolExecutor去构造工作线程池。Executors工厂类中创建线程池的快捷工厂方法实际上是调用ThreadPoolExecutor(定时任务使用ScheduledThreadPoolExecutor)线程池的构造方法完成的。ThreadPoolExecutor构造方法有多个重载版本,其中一个比较重要的构造器如下:
// 使用标准构造器构造一个普通的线程池
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数,即使线程空闲(Idle),也不会回收
int maximumPoolSize, // 线程数的上限
long keepAliveTime,
TimeUnit unit, // 线程最大空闲(Idle)时长
BlockingQueue<Runnable> workQueue, // 任务的排队队列
ThreadFactory threadFactory,
- 核心和最大线程数量
参数corePoolSize用于设置核心(Core)线程池数量,参数maximumPoolSize用于设置最大线程数量。线程池执行器将会根据corePoolSize和maximumPoolSize自动维护线程池中的工作线程
- BlockingQueue
BlockingQueue(阻塞队列)的实例用于暂时接收到的异步任务,如果线程池的核心线程都在忙,那么所接收到的目标任务缓存在阻塞队列中。
- keepAliveTime
线程构造器的keepAliveTime(空闲线程存活时间)参数用于设置池内线程最大Idle(空闲)时长(或者说保活时长),如果超过这个时间,默认情况下Idle、非Core线程会被回收。
1.6.4 向线程池提交任务的两种方式
向线程池提交任务的两种方式大致如下:
- 调用execute()方法 ```java //Executor 接口中的方法 void execute(Runnable command);
2. 调用submit()方法
```java
//ExecutorService 接口中的方法
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
以上的submit()和execute()两类方法的区别在哪里呢?大致有以下三点:
- Execute()方法只能接收Runnable类型的参数,而submit()方法可以接收Callable、Runnable两种类型的参数。Callable类型的任务是可以返回执行结果的,而Runnable类型的任务不可以返回执行结果。
- submit()提交任务后会有返回值,而execute()没有
- submit()方便Exception处理
通过submit()返回的Future对象获取结果
package com.crazymakercircle.multithread.basic.create3;
// 省略import
public class CreateThreadPoolDemo
{
// 省略其他
//测试用例:获取异步调用的结果
@Test
public void testSubmit2()
{
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
Future<Integer> future = pool.submit(new Callable<Integer>()
{
@Override
public Integer call() throws Exception
{
//返回200~300的随机数
return RandomUtil.randInRange(200, 300);
}
});
try
{
Integer result = future.get();
Print.tco("异步执行的结果是:" + result);
} catch (InterruptedException e)
{
Print.tco("异步调用被中断");
e.printStackTrace();
} catch (ExecutionException e)
{
Print.tco("异步调用过程中,发生了异常");
e.printStackTrace();
}
sleepSeconds(10);
//关闭线程池
pool.shutdown();
}
}
通过submit()返回的Future对象捕获异常
package com.crazymakercircle.multithread.basic.create3;
// 省略import
public class CreateThreadPoolDemo
{
//异步任务的执行目标类
static class TargetTask implements Runnable
{
//为了节约篇幅,省略重复内容
}
//异步的执行目标类:执行过程中将发生异常
static class TargetTaskWithError extends TargetTask
{
public void run()
{
super.run();
throw new RuntimeException("Error from " + taskName);
}
}
//测试用例:提交和执行
@Test
public void testSubmit()
{
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
1.6.5 线程池的任务调度流程
下面是一个错误的线程池配置示例:
package com.crazymakercircle.multithread.basic.create3;
// 省略import
public class CreateThreadPoolDemo
{
@org.junit.Test
public void testThreadPoolExecutor()
{
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, //corePoolSize
100, //maximumPoolSize
100, //keepAliveTime 空闲保活时长
TimeUnit.SECONDS, //空闲保活时长的单位
new LinkedBlockingDeque<>(100));//workQueue
//提交5个任务
for (int i = 0; i < 5; i++)
{
final int taskIndex = i;
executor.execute(() ->
{
Print.tco("taskIndex = " + taskIndex);
try
{ //极端测试:无限制睡眠
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e)
{
e.printStackTrace();
}
});
以上示例创建了最大线程数量maximumPoolSize为100的线程池,仅仅向其中提交了5个任务。理论上,这5个任务都会被执行到,奇怪的是示例中只有1个任务在执行,其他的4个任务都在等待。其他任务被加入到了阻塞队列中,需要等pool-1-thread-1线程执行完第一个任务后,才能依次从阻塞队列取出执行。但是,实例中的第一个任务是一个永远也没有办法完成的任务,所以其他的4个任务只能永远在阻塞队列中等待着。由于参数配置得不合理,因此出现了以上的奇怪现象。
- 核心和最大线程数量、BlockingQueue队列等参数如果配置得不合理,可能会造成异步任务得不到预期的并发执行,造成严重的排队等待现象。
- 线程池的调度器创建线程的一条重要的规则是:在corePoolSize已满之后,还需要等阻塞队列已满,才会去创建新的线程。
1.6.6 ThreadFactory(线程工厂)
```java package java.util.concurrent; public interface ThreadFactory {
}//唯一的方法:创建一个新线程
Thread newThread(Runnable target);
下面的例子首先实现一个简单的线程工厂,然后基于该线程工厂快捷创建线程池,具体的代码如下:
```java
package com.crazymakercircle.multithread.basic.create3;
// 省略import
public class CreateThreadPoolDemo
{
//一个简单的线程工厂
static public class SimpleThreadFactory implements ThreadFactory
{
static AtomicInteger threadNo = new AtomicInteger(1);
//实现其唯一的创建线程方法
@Override
public Thread newThread(Runnable target)
{
String threadName = "simpleThread-" + threadNo.get();
Print.tco("创建一个线程,名称为:" + threadName);
threadNo.incrementAndGet();
//设置线程名称和异步执行目标
Thread thread = new Thread(target,threadName);
//设置为守护线程
thread.setDaemon(true);
return thread;
}
}
//线程工厂的测试用例
@org.junit.Test
public void testThreadFactory()
{
//使用自定义线程工厂快捷创建一个固定大小的线程池
ExecutorService pool =
Executors.newFixedThreadPool(2,new SimpleThreadFactory());
for (int i = 0; i < 5; i++)
{
pool.submit(new TargetTask());
}
//等待10秒
sleepSeconds(10);
Print.tco("关闭线程池");
pool.shutdown();
}
// 省略其他
}
1.6.7 任务阻塞队列
Java中的阻塞队列(BlockingQueue)与普通队列相比有一个重要的特点:在阻塞队列为空时会阻塞当前线程的元素获取操作
Java线程池使用BlockingQueue实例暂时接收到的异步任务,BlockingQueue是JUC包的一个超级接口,比较常用的实现类有:
- ArrayBlockingQueue
- LinkedBlockingQueue
- PriorityBlockingQueue
- DelayQueue
- SynchronousQueue
1.6.8 调度器的钩子方法
ThreadPoolExecutor线程池调度器为每个任务执行前后都提供了钩子方法。 ```java //任务执行之前的钩子方法(前钩子) protected void beforeExecute(Thread t, Runnable r) { } //任务执行之后的钩子方法(后钩子) protected void afterExecute(Runnable r, Throwable t) { } //线程池终止时的钩子方法(停止钩子) protected void terminated() { }
1. beforeExecute:异步任务执行之前的钩子方法
1. afterExecute:异步任务执行之后的钩子方法
1. terminated:线程池终止时的钩子方法
为线程池定制钩子方法的示例,具体代码如下:
```java
package com.crazymakercircle.multithread.basic.create3;
// 省略import
public class CreateThreadPoolDemo
{
@org.junit.Test
public void testHooks()
{
ExecutorService pool = new ThreadPoolExecutor(2, //coreSize
4, //最大线程数
60,//空闲保活时长
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2)) //等待队列
{
//继承:调度器终止钩子
@Override
protected void terminated()
{
Print.tco("调度器已经终止!");
}
//继承:执行前钩子
@Override
protected void beforeExecute(Thread t, Runnable target)
{
Print.tco( target +"前钩被执行");
//记录开始执行时间
startTime.set(System.currentTimeMillis());
super.beforeExecute(t, target);
}
//继承:执行后钩子
@Override
protected void afterExecute(Runnable target, Throwable t)
{
super.afterExecute(target, t);
//计算执行时长
long time = (System.currentTimeMillis() - startTime.get()) ;
Print.tco( target + " 后钩被执行, 任务执行时长(ms):" + time);
//清空本地变量
startTime.remove();
}
};
for (int i = 1; i <= 5; i++)
{
pool.execute(new TargetTask());
}
//等待10秒
sleepSeconds(10);
Print.tco("关闭线程池");
pool.shutdown();
}
// 省略其他
}
1.6.9 线程池的拒绝策略
在线程池的任务缓存队列为有界队列(有容量限制的队列)的时候,如果队列满了,提交任务到线程池的时候就会被拒绝。总体来说,任务被拒绝有两种情况:
- 线程池已经被关闭
- 工作队列已满且maximumPoolSize已满。
无论以上哪种情况任务被拒绝,线程池都会调用RejectedExecutionHandler实例的rejectedExecution方法。RejectedExecutionHandler是拒绝策略的接口,JUC为该接口提供了以下几种实现:
- AbortPolicy:拒绝策略。
- DiscardPolicy:抛弃策略。
- DiscardOldestPolicy:抛弃最老任务策略。
- CallerRunsPolicy:调用者执行策略。
- 自定义策略。
- AbortPolicy
使用该策略时,如果线程池队列满了,新任务就会被拒绝,并且抛出RejectedExecutionException异常。该策略是线程池默认的拒绝策略。
- DiscardPolicy
该策略是AbortPolicy的Silent(安静)版本,如果线程池队列满了,新任务就会直接被丢掉,并且不会有任何异常抛出。
- DiscardOldestPolicy
抛弃最老任务策略,也就是说如果队列满了,就会将最早进入队列的任务抛弃,从队列中腾出空间,再尝试加入队列。因为队列是队尾进队头出,队头元素是最老的,所以每次都是移除队头元素后再尝试入队。
- CallerRunsPolicy
调用者执行策略。在新任务被添加到线程池时,如果添加失败,那么提交任务线程会自己去执行该任务,不会使用线程池中的线程去执行新任务。
- 自定义策略
如果以上拒绝策略都不符合需求,那么可自定义一个拒绝策略,实现RejectedExecutionHandler接口的rejectedExecution方法即可。
package com.crazymakercircle.multithread.basic.create3;
// 省略import
public class CreateThreadPoolDemo
{
//一个简单的线程工厂
static public class SimpleThreadFactoryimplements ThreadFactory
{
//为了节约篇幅,省略重复内容
}
//自定义拒绝策略
public static class CustomIgnorePolicy
implements RejectedExecutionHandler
{
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
{
// 可做日志记录等
Print.tco(r + " rejected; " +
" - getTaskCount: " + e.getTaskCount());
}
}
@org.junit.Test
public void testCustomIgnorePolicy()
{
int corePoolSize = 2; //核心线程数
int maximumPoolSize = 4; //最大线程数
long keepAliveTime = 10;
TimeUnit unit = TimeUnit.SECONDS;
//最大排队任务数
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
//线程工厂
ThreadFactory threadFactory = new SimpleThreadFactory();
//拒绝和异常处理策略
RejectedExecutionHandler policy = new CustomIgnorePolicy();
ThreadPoolExecutor pool = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime, unit,
workQueue,
threadFactory,
policy);
// 预启动所有核心线程
pool.prestartAllCoreThreads();
for (int i = 1; i <= 10; i++)
{
pool.execute(new TargetTask());
}
//等待10秒
sleepSeconds(10);
Print.tco("关闭线程池");
pool.shutdown();
}
// 省略其他
}
1.6.10 线程池的优雅关闭
一般情况下,线程池启动后建议手动关闭。在介绍线程池的优雅关闭之前,我们先了解一下线程池的状态。线程池总共存在5种状态,定义在ThreadPoolExecutor类中,具体代码如下:
package java.util.concurrent;
// 省略import
public class ThreadPoolExecutor extends AbstractExecutorService {
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 省略其他
}
- RUNNING:线程池创建之后的初始状态,这种状态下可以执行任务。
- SHUTDOWN:该状态下线程池不再接受新任务,但是会将工作队列中的任务执行完毕。
- STOP:该状态下线程池不再接受新任务,也不会处理工作队列中的剩余任务,并且将会中断所有工作线程。
- TIDYING:该状态下所有任务都已终止或者处理完成,将会执行terminated()钩子方法。
- TERMINATED:执行完terminated()钩子方法之后的状态。
线程池的状态转换规则为:
- 线程池创建之后状态为RUNNING。
- 执行线程池的shutdown()实例方法,会使线程池状态从RUNNING转变为SHUTDOWN。
- 执行线程池的shutdownNow()实例方法,会使线程池状态从RUNNING转变为STOP。
- 当线程池处于SHUTDOWN状态时,执行其shutdownNow()方法会将其状态转变为STOP。
- 等待线程池的所有工作线程停止,工作队列清空之后,线程池状态会从STOP转变为TIDYING。
- 执行完terminated()钩子方法之后,线程池状态从TIDYING转变为TERMINATED。
优雅地关闭线程池主要涉及的方法有3个:
- shutdown:是JUC提供的一个有序关闭线程池的方法,此方法会等待当前工作队列中的剩余任务全部执行完成之后,才会执行关闭,但是此方法被调用之后线程池的状态转为SHUTDOWN,线程池不会再接收新的任务。
- shutdownNow:是JUC提供的一个立即关闭线程池的方法,此方法会打断正在执行的工作线程,并且会清空当前工作队列中的剩余任务,返回的是尚未执行的任务。
- awaitTermination:等待线程池完成关闭。在调用线程池的shutdown()与shutdownNow()方法时,当前线程会立即返回,不会一直等待直到线程池完成关闭。如果需要等到线程池关闭完成,可以调用awaitTermination()方法。
- shutdown()方法的原理
```java
public void shutdown()
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
} finally {// 检查权限
checkShutdownAccess();
// 设置线程池状态
advanceRunState(SHUTDOWN);
// 中断空闲线程
interruptIdleWorkers();
// 钩子函数,主要用于清理一些资源
onShutdown();
} tryTerminate(); }mainLock.unlock();
Shutdown()方法首先加锁,其次检查调用者是否用于执行线程池关闭的Java Security权限。接着shutdown()方法会将线程池状态变为SHUTDOWN,在这之后线程池不再接受提交的新任务。此时如果还继续往线程池提交任务,将会使用线程池拒绝策略响应,默认的拒绝策略将会使用ThreadPoolExecutor.AbortPolicy,接收新任务时会抛出RejectedExecutionException异常。
2. shutdownNow()方法的原理
```java
public List<Runnable> shutdownNow()
{
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
// 检查状态
checkShutdownAccess();
// 将线程池状态变为 STOP
advanceRunState(STOP);
// 中断所有线程,包括工作线程以及空闲线程
interruptWorkers();
// 丢弃工作队列中的剩余任务
tasks = drainQueue();
} finally
{
mainLock.unlock();
}
tryTerminate();
return tasks;
}
shutdownNow()方法将会把线程池状态设置为STOP,然后中断所有线程(包括工作线程以及空闲线程),最后清空工作队列,取出工作队列所有未完成的任务返回给调用者。与有序的shutdown()方法相比,shutdownNow()方法比较粗暴,直接中断工作线程。不过这里需要注意的是,中断线程并不代表线程立刻结束,只是通过工作线程的interrupt()实例方法设置了中断状态,这里需要用户程序主动配合线程进行中断操作。
- awaitTermination()方法的使用
调用了线程池的shutdown()与shutdownNow()方法之后,用户程序都不会主动等待线程池关闭完成,如果需要等待线程池关闭完成,需要调用awaitTermination()进行主动等待。调用方法大致如下:
threadPool.shutdown();
try {
//一直等待,直到线程池完成关闭
while (!threadPool.awaitTermination(60,TimeUnit.SECONDS)){
System.out.println("线程池任务还未执行结束");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
优雅地关闭线程池
public static void shutdownThreadPoolGracefully(ExecutorService threadPool) {
if (!(threadPool instanceof ExecutorService) || threadPool.isTerminated()) {
return;
}
try {
threadPool.shutdown(); //拒绝接受新任务
} catch (SecurityException e) {
return;
} catch (NullPointerException e) {
return;
}
try {
// 等待 60 s,等待线程池中的任务完成执行
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
// 调用 shutdownNow 取消正在执行的任务
threadPool.shutdownNow();
// 再次等待 60 s,如果还未结束,可以再次尝试,或则直接放弃
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("线程池任务未正常执行结束");
}
}
} catch (InterruptedException ie) {
// 捕获异常,重新调用 shutdownNow
threadPool.shutdownNow();
}
//任然没有关闭,循环关闭1000次,每次等待10毫秒
if (!threadPool.isTerminated()) {
try {
for (int i = 0; i < 1000; i++) {
if (threadPool.awaitTermination(10, TimeUnit.MILLISECONDS)) {
break;
}
threadPool.shutdownNow();
}
} catch (InterruptedException e) {
System.err.println(e.getMessage());
} catch (Throwable e) {
System.err.println(e.getMessage());
}
}
}
注册JVM钩子函数自动关闭线程池
如果使用了线程池,可以在JVM中注册一个钩子函数,在JVM进程关闭之前,由钩子函数自动将线程池优雅地关闭,以确保资源正常释放。
package com.crazymakercircle.util;
// 省略import
public class ThreadUtil
{
//懒汉式单例创建线程池:用于执行定时、顺序任务
static class SeqOrScheduledTargetThreadPoolLazyHolder
{
//线程池:用于定时任务、顺序排队执行任务
static final ScheduledThreadPoolExecutor EXECUTOR =
new ScheduledThreadPoolExecutor( 1,
new CustomThreadFactory("seq"));
static
{
//注册JVM关闭时的钩子函数
Runtime.getRuntime().addShutdownHook(
new ShutdownHookThread("定时和顺序任务线程池",
new Callable<Void>()
{
@Override
public Void call() throws Exception
{
//优雅地关闭线程池
shutdownThreadPoolGracefully(EXECUTOR);
return null;
}
}));
}
}
// 省略不相干代码
}
1.6.11 Executors快捷创建线程池的潜在问题
- 使用Executors创建“固定数量的线程池”的潜在问题
```java
public static ExecutorService newFixedThreadPool(int nThreads)
{
return new ThreadPoolExecutor(
nThreads, // 核心线程数
nThreads, // 最大线程数
0L, // 线程最大空闲(Idle)时长
TimeUnit.MILLISECONDS, // 时间单位:毫秒
new LinkedBlockingQueue<Runnable>() //任务的排队队列,无界队列
);
}
使用Executors创建“固定数量的线程池”的潜在问题主要存在于其workQueue上,其值为LinkedBlockingQueue(无界阻塞队列)。如果任务提交速度持续大于任务处理速度,就会造成队列中大量的任务等待。如果队列很大,很有可能导致JVM出现OOM(Out Of Memory)异常,即内存资源耗尽。
2. 使用Executors创建“单线程化线程池”的潜在问题
```java
public static ExecutorService newSingleThreadExecutor()
{
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(
1, // 核心线程数
1, // 最大线程数
0L, // 线程最大空闲(Idle)时长
TimeUnit.MILLISECONDS, //时间单位:毫秒
new LinkedBlockingQueue<Runnable>() //无界队列
));
}
为了演示“单线程化线程池”的corePoolSize始终保持为1而不能被修改,接下来首先调用newSingleThreadExecutor()工厂方法创建一个“单线程化线程池”,然后试图修改其corePoolSize属性,具体的代码如下:
@org.junit.Test
public void testNewFixedThreadPool2()
{
//创建一个固定大小的线程池
ExecutorService fixedExecutorService =
Executors.newFixedThreadPool(1);
ThreadPoolExecutor threadPoolExecutor =
(ThreadPoolExecutor) fixedExecutorService;
Print.tco(threadPoolExecutor.getMaximumPoolSize());
//设置核心线程数
threadPoolExecutor.setCorePoolSize(8);
//创建一个单线程化的线程池
ExecutorService singleExecutorService =
Executors.newSingleThreadExecutor();
//转换成普通线程池,会抛出运行时异常 java.lang.ClassCastException
((ThreadPoolExecutor) singleExecutorService).setCorePoolSize(8);
}
使用Executors创建的“单线程化线程池”与“固定大小的线程池”一样,其潜在问题仍然存在于其workQueue属性上,该属性的值为LinkedBlockingQueue(无界阻塞队列)。如果任务提交速度持续大于任务处理速度,就会造成队列大量阻塞。如果队列很大,很有可能导致JVM的OOM异常,甚至造成内存资源耗尽。
- 使用Executors创建“可缓存线程池”的潜在问题
```java
public static ExecutorService newCachedThreadPool()
{
return new ThreadPoolExecutor(
); }0, // 核心线程数
Integer.MAX_VALUE, // 最大线程数
60L, // 线程最大空闲(Idle)时长
TimeUnit.MILLISECONDS, // 时间单位:毫秒
new SynchronousQueue<Runnable>() // 任务的排队队列,无界队列
4.使用Executors创建“可调度线程池”的潜在问题
<a name="t1W2U"></a>
# 1.7 确定线程池的线程数
<a name="i1hIu"></a>
## 1.7.1 按照任务类型对线程池进行分类
1. IO密集型任务
1. CPU密集型任务
1. 混合型任务
<a name="i8LmW"></a>
## 1.7.2 为IO密集型任务确定线程数
由于IO密集型任务的CPU使用率较低,导致线程空余时间很多,因此通常需要开CPU核心数两倍的线程。当IO线程空闲时,可以启用其他线程继续使用CPU,以提高CPU的使用率。<br />Netty的IO处理任务就是典型的IO密集型任务。所以,Netty的Reactor(反应器)实现类(定制版的线程池)的IO处理线程数默认正好为CPU核数的两倍,以下是其相关的代码:
```java
//多线程版本Reactor实现类
public abstract class MultithreadEventLoopGroup extends
MultithreadEventExecutorGroup implements EventLoopGroup {
//IO事件处理线程数
private static final int DEFAULT_EVENT_LOOP_THREADS;
//IO事件处理线程数默认值为CPU核数的两倍
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1,
SystemPropertyUtil.getInt("io.netty.eventLoopThreads",
Runtime.getRuntime().availableProcessors() * 2));
}
/**
*构造器
*/
protected MultithreadEventLoopGroup(int nThreads,
ThreadFactory threadFactory, Object... args) {
super(nThreads == 0?
DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
// 省略其他
}
package com.crazymakercircle.util;
// 省略import
public class ThreadUtil
{
//CPU核数
public static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
//IO处理线程数
public static final int IO_MAX = Math.max(2, CPU_COUNT * 2);
/**
* 空闲保活时限,单位秒
*/
private static final int KEEP_ALIVE_SECONDS = 30;
/**
* 有界队列size
*/
private static final int QUEUE_SIZE = 128;
//懒汉式单例创建线程池:用于IO密集型任务
public class IoIntenseTargetThreadPoolLazyHolder {
//线程池: 用于IO密集型任务
public static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
IO_MAX,
IO_MAX,
KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS,
new LinkedBlockingQueue(QUEUE_SIZE),
new ThreadUtil.CustomThreadFactory("io"));
public static ThreadPoolExecutor getInnerExecutor() {
return EXECUTOR;
}
static {
log.info("线程池已经初始化");
EXECUTOR.allowCoreThreadTimeOut(true);
//JVM关闭时的钩子函数
Runtime.getRuntime().addShutdownHook(
new ShutdownHookThread("IO密集型任务线程池", new Callable<Void>() {
@Override
public Void call() throws Exception {
//优雅关闭线程池
shutdownThreadPoolGracefully(EXECUTOR);
return null;
}
}));
}
}
1.7.3 为CPU密集型任务确定线程数
CPU密集型任务并行执行的数量应当等于CPU的核心数。
package com.crazymakercircle.util;
// 省略import
public class ThreadUtil
{
//CPU核数
private static final int CPU_COUNT =
Runtime.getRuntime().availableProcessors();
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT;
//懒汉式单例创建线程池:用于CPU密集型任务
private static class CpuIntenseTargetThreadPoolLazyHolder
{
//线程池:用于CPU密集型任务
private static final ThreadPoolExecutor EXECUTOR =
new ThreadPoolExecutor(
MAXIMUM_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS,
new LinkedBlockingQueue(QUEUE_SIZE),
new CustomThreadFactory("cpu"));
static
{
EXECUTOR.allowCoreThreadTimeOut(true);
//JVM关闭时的钩子函数
Runtime.getRuntime().addShutdownHook(
new ShutdownHookThread("CPU密集型任务线程池",
new Callable<Void>()
{
@Override
public Void call() throws Exception
{
//优雅地关闭线程池
shutdownThreadPoolGracefully(EXECUTOR);
return null;
}
}));
}
}
// 省略不相干代码
}
1.7.4 为混合型任务确定线程数
最佳线程数 = ((线程等待时间+线程CPU时间) / 线程CPU时间) * CPU核数
通过公式可以看出:等待时间所占的比例越高,需要的线程就越多;CPU耗时所占的比例越高,需要的线程就越少。
package com.crazymakercircle.util;
// 省略import
public class ThreadUtil
{
private static final int MIXED_MAX = 128; //最大线程数
private static final String MIXED_THREAD_AMOUNT = "mixed.thread.amount";
//懒汉式单例创建线程池:用于混合型任务
private static class MixedTargetThreadPoolLazyHolder
{
//首先从环境变量 mixed.thread.amount 中获取预先配置的线程数
//如果没有对 mixed.thread.amount进行配置,就使用常量 MIXED_MAX作为线程数
private static final int max =
(null != System.getProperty(MIXED_THREAD_AMOUNT)) ?
Integer.parseInt(System.getProperty(MIXED_THREAD_AMOUNT))
在使用如上代码创建混合型线程池时,建议按照前面的最佳线程数估算公式提前预估好线程数(如80),然后设置在环境变量mixed.thread.amount中,测试用例如下:
package com.crazymakercircle.multithread.basic.create3;
// 省略import
public class CreateThreadPoolDemo
{
@org.junit.Test
public void testMixedThreadPool()
{
System.getProperties().put("mixed.thread", 600);
// 获取自定义的混合线程池
ExecutorService pool =
ThreadUtil.getMixedTargetThreadPool();
for (int i = 0; i < 1000; i++)
{
try
{
sleepMilliSeconds(10);
pool.submit(new TargetTask());
} catch (RejectedExecutionException e)
{
//异常处理
}
}
//等待10s
sleepSeconds(10);
Print.tco("关闭线程池");
}
// 省略其他
}
1.8 ThreadLocal原理与实战
在Java的多线程并发执行过程中,为了保证多个线程对变量的安全访问,可以将变量放到ThreadLocal类型的对象中,使变量在每个线程中都有独立值,不会出现一个线程读取变量时被另一个线程修改的现象。ThreadLocal类通常被翻译为“线程本地变量”类或者“线程局部变量”类。
1.8.1 ThreadLocal的基本使用
下面的例子通过ThreadLocal的成员方法进行“线程本地变量”中线程本地值的设置、获取和移除,具体的代码如下:
package com.crazymakercircle.multithread.basic.threadlocal;
// 省略import
public class ThreadLocalTest
{
@Data
static class Foo
{
//实例总数
static final AtomicInteger AMOUNT = new AtomicInteger(0);
//对象的编号
int index = 0;
//对象的内容
int bar = 10;
//构造器
public Foo()
{
index = AMOUNT.incrementAndGet(); //总数增加,并且给对象编号
}
@Override
public String toString()
{
return index + "@Foo{bar=" + bar + '}';
}
}
//定义线程本地变量
private static final ThreadLocal<Foo> LOCAL_FOO = new ThreadLocal<Foo>();
public static void main(String[] args) throws InterruptedException
{
//获取自定义的混合型线程池
ThreadPoolExecutor threadPool =
ThreadUtil.getMixedTargetThreadPool();
//提交5个任务,将会用到5个线程
for (int i = 0; i < 5; i++)
{
threadPool.execute(new Runnable()
{
@Override
public void run()
{
//获取“线程本地变量”中当前线程所绑定的值
if (LOCAL_FOO.get() == null)
{
//设置“线程本地变量”中当前线程所绑定的值
LOCAL_FOO.set(new Foo());
}
Print.tco("初始的本地值:" + LOCAL_FOO.get());
//每个线程执行10次
for (int i = 0; i < 10; i++)
{
Foo foo = LOCAL_FOO.get();
foo.setBar(foo.getBar() + 1); //值增1
sleepMilliSeconds(10);
}
Print.tco("累加10次之后的本地值:" + LOCAL_FOO.get());
//删除“线程本地变量”中当前线程所绑定的值
LOCAL_FOO.remove(); //这点对于线程池中的线程尤其重要
}
});
}
}
}
在当前线程尚未绑定值时,如果希望从线程本地变量获取到初始值,而且不想采用以上的“判空后设值”这种相对烦琐的方式,可以调用ThreadLocal.withInitial(…)静态工厂方法,在定义ThreadLocal对象时设置一个获取初始值的回调函数,具体的代码如下:
ThreadLocal<Foo> LOCAL_FOO = ThreadLocal.withInitial(() -> new Foo());
以上代码并没有使用new ThreadLocal
1.8.2 ThreadLocal的使用场景
- 线程隔离
-
1.8.3 使用ThreadLocal进行线程隔离
下面的代码来自Hibernate,代码中通过ThreadLocal进行数据库连接(Session)的“线程本地化”存储,主要的代码如下: ```java private static final ThreadLocal threadSession = new ThreadLocal();
public static Session getSession() throws InfrastructureException {
Session s = (Session) threadSession.get();
try {
if (s == null) {
s = getSessionFactory().openSession();
threadSession.set(s);
}
} catch (HibernateException ex) {
throw new InfrastructureException(ex);
}
return s;
}
<a name="GpaZW"></a>
## 1.8.4 使用ThreadLocal进行跨函数数据传递
```java
package com.crazymaker.springcloud.common.context;
// 省略import
public class SessionHolder
{
// session id,线程本地变量
private static final ThreadLocal<String> sidLocal =
new ThreadLocal<>("sidLocal");
// 用户信息,线程本地变量
private static final ThreadLocal<UserDTO> sessionUserLocal =
new ThreadLocal<>("sessionUserLocal");
// session,线程本地变量
private static final ThreadLocal<HttpSession> sessionLocal =
new ThreadLocal<>("sessionLocal");
// 省略其他
/**
*保存session在线程本地变量中
*/
public static void setSession(HttpSession session)
{
sessionLocal.set(session);
}
/**
* 取得绑定在线程本地变量中的session
*/
public static HttpSession getSession()
{
HttpSession session = sessionLocal.get();
Assert.notNull(session, "session未设置");
return session;
}
// 省略其他
}
1.8.5 ThreadLocal内部结构演进
1.8.6 ThreadLocal源码分析
set(T value)方法 ```java public void set(T value) {
//获取当前线程对象
Thread t = Thread.currentThread();
//获取当前线程的ThreadLocalMap 成员
ThreadLocalMap map = getMap(t);
//判断map是否存在
if (map != null)
{
//value被绑定到threadLocal实例
map.set(this, value);
}
else
{
// 如果当前线程没有ThreadLocalMap成员实例
// 创建一个ThreadLocalMap实例,然后作为成员关联到t(thread实例)
createMap(t, value);
}
}
// 获取线程t的ThreadLocalMap成员
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
// 线程t创建一个ThreadLocalMap成员
// 并为新的Map成员设置第一个“Key-Value对”,Key为当前的ThreadLocal实例
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
2. get()方法
```java
public T get() {
// 获得当前线程对象
Thread t = Thread.currentThread();
// 获得线程对象的ThreadLocalMap 内部成员
ThreadLocalMap map = getMap(t);
// 如果当前线程的内部map成员存在
if (map != null) {
// 以当前ThreadLocal为Key,尝试获得条目
ThreadLocalMap.Entry e = map.getEntry(this);
// 条目存在
if (e != null) {
T result = (T)e.value;
return result;
}
}
// 如果当前线程对应的map不存在
// 或者map存在,但是当前ThreadLocal实例没有对应的“Key-Value对”,返回初始值
return setInitialValue();
}
// 设置ThreadLocal关联的初始值并返回
private T setInitialValue() {
// 调用初始化钩子函数,获取初始值
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}
- remove()方法
```java
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
m.remove(this);
}
4. initialValue()方法
```java
protected T initialValue() {
return null;
}
使用工厂方法构造ThreadLocal实例的代码如下:
ThreadLocal<Foo> LOCAL_FOO = ThreadLocal.withInitial(() -> new Foo());
JDK定义的ThreadLocal.withInitial(…)静态工厂方法及其内部子类SuppliedThreadLocal的源码如下:
//ThreadLocal工厂方法可以设置本地变量初始值钩子函数
public static <S> ThreadLocal<S> withInitial(
Supplier<? extends S> supplier) {
return new SuppliedThreadLocal<>(supplier);
}
//内部静态子类
//继承了ThreadLocal,重写了initialValue()方法,返回钩子函数的值作为初始值
static final class SuppliedThreadLocal<T> extends ThreadLocal<T> {
//保存钩子函数
private final Supplier<? extends T> supplier;
//传入钩子函数
SuppliedThreadLocal(Supplier<? extends T> supplier) {
this.supplier = Objects.requireNonNull(supplier);
}
@Override
protected T initialValue() {
return supplier.get(); //返回钩子函数的值作为初始值
}
}
1.8.7 ThreadLocalMap源码分析
- ThreadLocalMap的主要成员变量
```java
public class ThreadLocal
{
static class ThreadLocalMap {// 省略其他
}// Map的条目数组,作为哈希表使用
private Entry[] table;
// Map的条目初始容量16
private static final int INITIAL_CAPACITY = 16;
// Map的条目数量
private int size = 0;
// 扩容因子
private int threshold;
// Map的条目类型,一个静态的内部类
// Entry 继承子WeakReference, Key为ThreadLocal实例
static class Entry extends WeakReference<ThreadLocal<?>> {
Object value; //条目的值
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
// 省略其他
2. Entry的Key需要使用弱引用
```java
// Entry 继承了WeakReference,并使用WeakReference对Key进行包装
static class Entry extends WeakReference<ThreadLocal<?>> {
Object value; //值
Entry(ThreadLocal<?> k, Object v) {
super(k); //使用WeakReference对Key进行包装
value = v;
}
}
什么是弱引用呢?仅有弱引用(Weak Reference)指向的对象只能生存到下一次垃圾回收之前。换句话说,当GC发生时,无论内存够不够,仅有弱引用所指向的对象都会被回收。而拥有强引用指向的对象则不会被直接回收。
编程规范推荐使用static final修饰ThreadLocal对象
1.8.8 ThreadLocal综合使用案例
尽量使用private static final修饰ThreadLocal实例。使用private与final修饰符主要是为了尽可能不让他人修改、变更ThreadLocal变量的引用,使用static修饰符主要是为了确保ThreadLocal实例的全局唯一。
ThreadLocal使用完成之后务必调用remove()方法。这是简单、有效地避免ThreadLocal引发内存泄漏问题的方法。 ```java package com.crazymakercircle.multithread.basic.threadlocal; // 省略import public class ThreadLocalTest2 {
/**
* 模拟业务方法
*/
public void serviceMethod()
{
//睡眠500毫秒,模拟执行所需的时间(耗时)
sleepMilliSeconds(500);
//记录从开始调用到当前这个点( "point-1")的耗时
SpeedLog.logPoint("point-1 service");
//调用DAO()方法:模拟DAO业务方法
daoMethod();
//调用RPC()方法:模拟RPC远程业务方法
rpcMethod();
}
/**
* 模拟DAO业务方法
*/
public void daoMethod()
{
//睡眠400毫秒,模拟执行所需的时间
sleepMilliSeconds(400);
//记录上一个点("point-1")到这里("point-2")的耗时
SpeedLog.logPoint("point-2 dao");
}
/**
* 模拟RPC远程业务方法
*/
public void rpcMethod()
{
//睡眠400毫秒,模拟执行所需的时间
sleepMilliSeconds(600);
//记录上一个点("point-2")到这里("point-3")的耗时
SpeedLog.logPoint("point-3 rpc");
}
// 省略不相干代码
}
SpeedLog类的代码大致如下:
```java
package com.crazymakercircle.multithread.basic.threadlocal;
// 省略import
public class SpeedLog
{
/**
* 记录调用耗时的本地Map变量
*/
private static final ThreadLocal<Map<String, Long>>
TIME_RECORD_LOCAL =ThreadLocal.withInitial(SpeedLog::initialStartTime);
/**
* 记录调用耗时的本地Map变量的初始化方法
*/
public static Map<String, Long> initialStartTime()
{
Map<String, Long> map = new HashMap<>();
map.put("start", System.currentTimeMillis());
map.put("last", System.currentTimeMillis());
return map;
}
/**
* 开始耗时记录
*/
public static final void beginSpeedLog()
{
Print.fo("开始耗时记录");
TIME_RECORD_LOCAL.get();
}
/**
* 结束耗时记录
*/
public static final void endSpeedLog()
{
TIME_RECORD_LOCAL.remove();
Print.fo("结束耗时记录");
}
/**
* 耗时埋点
*/
public static final void logPoint(String point)
{
//获取上一次的时间
Long last = TIME_RECORD_LOCAL.get().get("last");
//计算上一次埋点到当前埋点的耗时
Long cost = System.currentTimeMillis() - last;
//保存上一次埋点到当前埋点的耗时
TIME_RECORD_LOCAL.get().put(point + " cost:", cost);
//保存当前时间,供下一次埋点使用
TIME_RECORD_LOCAL.get().put("last", System.currentTimeMillis());
}
// 省略不相干代码
}