04. 线程池
1. 线程池的含义
/*
* 什么是线程池
* 线程池其实就是一种多线程处理像是,处理过程中可以将任务添加到队列中,然后再创建线程后自动启用这些任务.
* 这里的线程就是之前学过的线程,这里的任务就是实现了Runnable和callable接口的实例对象*/
/*
* 为什么要是使用线程池
* 使用线程池最大的原因就是可以根据系统的需求和硬件环境灵活的控制线程的数量,
* 且库对所有的线程进行统一的管理和控制,从而提高系统的运行效率,降低系统运行的压力,当然了,使用线程池的原因不仅仅只有这些,
* 我们可以从线程池自身的优点上来进一步了解下线程池的好处*/
/*
* 使用线程池的好处
* 1.线程和任务分开,提高线程的重用性
* 2.控制线程并发数量,降低服务器压力,统一管理所有线程
* 3.提升系统响应速度,假如创建线程用的时间为T1,执行任务用的时间为T2,
* 销毁线程用的时间为T3,那么使用线程就免去了T1和T3的时间*/
/*
* 线程池的应用场景
* 1.网购商品秒杀
* 2.云盘文件上传和下载
* 3.12306网上购票系统等
* 总之:只要有并发的地方,任务数量大或小,每个任务执行的时间长与短都可以使用线程池;
* 只不过在使用线程池的时候,注意一下设置合理的线程池大小即可 */
/*
* 通过观察java中内置的线程池参数讲解和线程池工作流程总结,我们不难发现,要设计一个号的线程池,
* 就必须合理的设置线程池的4个参数,那么到底该如何合理的设计4个参数的值
* 1.核心线程数(corePoolSize)
* 核心线程数的设计需要依据任务的处理时间和每秒产生的任务数量来确定,一般按照8020的原则设计
* ,即按照百分之80的情况设置核心线程数,声息的百分之20可以利用最大线程数处理
* 2.任务队列长度(workQueue)
* 任务队列长度一般设计为:(核心线程数/单个执行时间)*2
* 3.最大线程数(maximumPoolSize)
* 最大线程数的设计除了需要参照核心线程数的条件外,还需要参照每秒产生的最大任务数决定
* 例如:如果系统每秒产生的任务是1000个,那么最大线程数 = (最大任务数 - 任务队列长度) * 单个任务执行时间
* 即:最大线程数 = (1000 -200)*0.1 = 80个
* 4.最大空闲时间(keepAliveTime)
* 这个参数的设计完全参考系统运行环境和硬件压力设定,没有固定的参考值,开发者可以根据经验和系统产生任务的时间
* 间隔来合理设置一个值即可
* 注意:上面4个参数的设置只是一般的设计原则,并不是固定的,开发者可以根据实际情况灵活调整*/
2. java内置线程池-ExecutorService
2.1 newCachedThreadPool : 创建一个默认的线程池对象
作用是:创建一个默认的线程池对象,里面的线程可以重用,且在第一次使用时才创建
public static void main(String[] args) {
/*
* java内置线程池-ExecutorService
* ExecutorService接口是java内置的线程池接口
* 常用方法:
* void shutdown() 启用一次顺序关闭,执行以前提交的任务,但不接受新任务
* List<Runnable> shutdownNow() 停止所有正在执行的任务,暂停处理正在等待的任务,并返回等待执行的任务列表
* <T>Future<T> submit(Callable<T> task) 执行带返回值的任务,返回一个 Future对象
* */
// fn1();
fn2();
}
public static void fn1() {
// 1.创建一个默认的线程池对象,里面的线程可以重用,且在第一次使用时才创建
ExecutorService es = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
es.submit(new ThreadFn(i));
}
es.shutdown();
}
public static void fn2() {
// 1.使用工厂类获取线程池对象
ExecutorService es =
Executors.newCachedThreadPool(
new ThreadFactory() {
int n = 1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "自定义线程名称" + n++);
}
});
// 2.提交任务
for (int i = 0; i < 20; i++) {
es.submit(new ThreadFn(i));
}
es.shutdown();
}
}
2.2 newFixedThreadPool:创建一个可重用固定线程数的线程池
public class PoolTest2 {
public static void main(String[] args) {
// 创建一个可重用固定线程数的线程池
// fn1();
fn2();
}
public static void fn1() {
ExecutorService es = Executors.newFixedThreadPool(5);
for (int i = 0; i < 30; i++) {
es.submit(new ThreadFn(i));
}
es.shutdown();
}
public static void fn2() {
ExecutorService es =
Executors.newFixedThreadPool(
5,
new ThreadFactory() {
int i = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "自定义名称" + i++);
}
});
for (int i = 0; i < 20; i++) {
es.submit(new ThreadFn(i));
}
es.shutdown();
}
}
2.3 newSingleThreadExecutor: 创建一个使用单个worker线程的Executor
public class PoolTest3 {
public static void main(String[] args) {
// 创建一个使用单个worker线程的Executor,以无界队列的方式来运行该程序
// fn1();
fn2();
}
public static void fn1() {
ExecutorService es = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
es.submit(new ThreadFn(i));
}
es.shutdown();
}
public static void fn2() {
ExecutorService es =
Executors.newSingleThreadExecutor(
new ThreadFactory() {
int i = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "自定义线程名" + i++);
}
});
for (int i = 0; i < 10; i++) {
es.submit(new ThreadFn(i));
}
// es.shutdown();
// 测试shutdown和shutdownNow的区别
// shutdownNow会将未完成的任务用集合返回出来
List<Runnable> rs = es.shutdownNow();
System.out.println(rs);
}
}
测试的Run方法
public class ThreadFn implements Runnable {
private int id;
public ThreadFn(int id) {
this.id = id;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "执行了任务" + id);
}
3. ScheduleExecutorService是ExecutorService的子接口
3.1 newScheduledThreadPool: 创建一个可重用固定线程数的线程池且允许延迟运行或定期执行任务
public class PoolTest1 {
// ScheduleExecutorService是ExecutorService的子接口,具备了延迟运行或者定期执行任务的能力
// 创建一个可重用固定线程数的线程池且允许延迟运行或定期执行任务
public static void main(String[] args) {
// fn1();
fn2();
}
public static void fn1() {
ScheduledExecutorService ss = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 10; i++) {
// 给这个线程任务延时,定时2秒后执行,不是每个都延时了2秒,而是全部在2秒后开始运行
ss.schedule(new ThreadFn(i), 2, TimeUnit.SECONDS);
}
ss.shutdown();
}
public static void fn2() {
ScheduledExecutorService ss =
Executors.newScheduledThreadPool(
3,
new ThreadFactory() {
int i = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "自定义线程名" + i++);
}
});
for (int i = 0; i < 5; i++) {
// scheduleAtFixedRate,有4个参数 : 该方法意味着无限定期的执行
// 1.为线程任务 2.开始延时时间 3.多长时期执行一次 4.时间单位
ss.scheduleAtFixedRate(new ThreadFn(i), 2, 5, TimeUnit.SECONDS);
}
// ss.shutdown();
}
}
3.2 newSingleThreadScheduledExecutor : 创建一个单线程执行程序,它允许在给定的延迟后运行命令或者定期的执行
public class PoolTest2 {
// 创建一个单线程执行程序,它允许在给定的延迟后运行命令或者定期的执行
// newSingleThreadScheduledExecutor()
public static void main(String[] args) {
// fn1();
fn2();
}
public static void fn1() {
ScheduledExecutorService se = Executors.newSingleThreadScheduledExecutor();
for (int i = 0; i < 5; i++) {
se.schedule(new ThreadFn(i), 2, TimeUnit.SECONDS);
}
se.shutdown();
}
public static void fn2() {
ScheduledExecutorService se =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
int i = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "自定义线程名" + i++);
}
});
for (int i = 0; i < 5; i++) {
se.scheduleAtFixedRate(new ThreadFn(i), 2, 5, TimeUnit.SECONDS);
}
// se.shutdown();
}
}
测试用的Run方法
public class ThreadFn implements Runnable {
private int id;
public ThreadFn(int id) {
this.id = id;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "执行了任务" + id);
}
}
4. 线程池的拒绝策略
public class Pool1 {
/*ThreadPoolExecutor 构造方法的最后一个参数指定了拒绝策略.当提交给线程池的任务量超过实际承载能力时,如何处理?
即线程池中 的线程已经用完了,等待队列也满了,无法为新提交的任务服务,
可以 通过拒绝策略来处理这个问题. JDK提供了四种拒绝策略:
AbortPolicy 策略,会抛出异常
CallerRunsPolicy 策略,只要线程池没关闭,会在调用者线程中运行当前被丢弃的任务
DiscardOldestPolicy 将任务队列中最老的任务丢弃,尝试再次提交新任务
DiscardPolicy 直接丢弃这个无法处理的任务
Executors 工具类提供的静态方法返回的线程池默认的拒绝策略是AbortPolicy 抛出异常,
如果内置的拒绝策略无法满足实际需求,可以扩展RejectedExecutionHandler 接口*/
public static void main(String[] args) {
NewPool n = new NewPool(5, 5, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
ThreadPoolExecutor newPool = n.getNewPool();
for (int i = 0; i < 100; i++) {
newPool.submit(new TestRun());
}
newPool.shutdown();
}
}
自定义线程池
public class NewPool {
private ThreadPoolExecutor te;
public NewPool(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this.te =
new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
Executors.defaultThreadFactory(),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// r就是请求的任务, executor就是当前线程池
System.out.println(r + "执行拒绝策略");
}
});
}
public ThreadPoolExecutor getNewPool() {
return te;
}
}
测试Run方法
public class TestRun implements Runnable {
// 定义任务
@Override
public void run() {
int num = new Random().nextInt(5);
System.out.println(
Thread.currentThread().getName() + "--" + System.currentTimeMillis() + "开始睡眠" + num + "秒");
try {
TimeUnit.SECONDS.sleep(num);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}