1、线程池的自我介绍
线程池的重要性:面试和工作当中都会出现
什么是池
软件中的“池”,可以理解为计划经济。
第一个好处:复用线程,
第二个好处:控制资源总量。
如果不使用线程池,每个任务都新开一个线程处理
- 一个线程
- for循环
- 任务数量上升到1000个
Java中的线程对应操作系统中的线程,这样的开销太大,我们希望有固定数量的线程,来执行这1000个线程,这样就避免了反复创建并销毁线程所带来的开销问题。
当存在一个任务,来一个任务就处理一个异常。
public class EveryTaskOneThread {
public static void main(String[] args) {
Thread thread = new Thread(new Task());
thread.start();
}
static class Task implements Runnable {
@Override
public void run() {
System.out.println("执行了任务");
}
}
}
多个任务采用循环方式创建线程
public class ForLoop {
public static void main(String[] args) {
for (int i = 0; i < 1000; i++) {
Thread thread = new Thread(new Task());
thread.start();
}
}
static class Task implements Runnable {
@Override
public void run() {
System.out.println("执行了任务");
}
}
}
1.1 为什么需要使用线程池?
问题1:反复创建开销大
问题2:过多的线程会占用过多内存
1.2 线程池好处
- 加快响应速度
- 合理利用CPU和内存
- 统一管理
1.3 线程池适合应用场合
服务器接收到大量请求时,使用线程池技术是非常合适的,它可以大大减少线程的创建和销毁次数,提高服务器的工作效率。实际上,在开发中,如果需要创建5个以上的线程,那么就可以使用线程池来管理。
2、创建和停止线程池
2.1 创建线程池
2.1.1 线程池构造函数的参数
首先我们要理清线程池各个类之间的关系。
ExecutorService继承自Executor,ThreadPoolExecutor实现了ExcutorService。
而Executors并不是线程池的实现,复数形式的Executor其实本质上是一个工具类,能够返回特定功能的线程池,其实内部都是创建的ThreadPoolExecutor对象,参数不同而已,并且方法的返回值是ExecutorService,这个也是没问题的,用接口类接收实现类的对象,是被允许的。
2.1.2 参数中的corePoolSize和maxPoolSize
corePoolSize指的是核心线程数:线程池在完成初始化后,默认情况下,线程池中没有执行任何线程,线程池会等待有任务到来时,再创建新线程去执行任务。
线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线程数有一个上限,这个最大的量maxPoolSize。即使没有任务也不会减少到核心线程数以下。
添加线程规则
1、如果线程小于corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来运行新任务。
2、如果线程数等于(或大于)corePoolSize但少于maximumPoolSize,则将任务放入队列。
3、如果队列已满,并且线程数小于maxPoolSize,则创建一个新线程来运行任务。
4、如果队列已满,并且线程数大于或等于maxPoolSize,则拒绝该任务。
是否需要增加线程的判断顺序是:corePoolSize -> workQueue -> maxPoolSize
举个例子:线程池:核心池大小为5,最大池大小为10,队列为100
因为线程中的请求最多会创建五个然后任务将被添加到队列中,直到达到100,当队列满时,将会创建最新的线程maxPoolSize,最多到10个线程,如果再来任务就拒绝
2.1.3 增减线程的特点
- 通过设置corePoolSize和maximumPoolSize相同,就可以创建固定大小的线程池。
- 线程池希望保持较少的线程数,并且只有在负载变得很大时才增加它。
- 通过设置maximumPoolSize为很高的值,例如Integer。MAX_VALUE,可以允许线程池容纳任意数量的并发任务。
- 是只有在队列填满时才能创建多余corePoolSize的线程,所以如果你使用的是无界队列(例如LinkedBlockingQueue),那么线程数就不会超过corePoolSize;
2.1.4 keepAliveTime和线程工厂
keepAliveTime:如果线程池当前的线程多于corePoolSize,那么如果多余的线程空闲时间超过keepAliveTime,它们就会被终止。
ThreadFactory 用来创建线程,新的线程是由ThreadFactory创建,默认使用Executors.defaultThreadFactory,创建出来的线程都在同一个线程组,拥有同样的NORM_PRIORITY优先级并且都不是守护线程。如果自己指定ThreadFactory,那就可以改变线程名、线程组、优先级、是否是守护线程等。
通常我们使用默认的ThreadFactory就可以了,以下是默认线程工厂源码:
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
2.1.5 工作队列
有三种最常见的队列类型:
1)直接交接:SynchronousQueue,没有队列进行任务缓存,队列不保存任务,需要把maximumPoolSize设置大一点。
2) 无界队列:LinkedBlockingQueue,队列是无界的,maximumPoolSize设置多大都没用,因为队列永远填不满,所以线程数永远不会超过corePoolSize。
3) 有界的队列:ArrayBlockingQueue,maximumPoolSize有意义了,当线程数达到corePoolSize并且队列满了,就回去创建新线程,直到maximumPoolSize个线程。
2.2 内存溢出的情况
线程池应该手动创建还是自动创建。
设置运行内存,-Xmx8m -Xms8m
- 手动创建更好,因为这样可以让我们更加明确线程池的运行规则,避免资源耗尽风险。
- 让我们来看看自动创建线程池(也就是直接调用JDK封装好的构造函数)可能带来哪些问题。
newFixedThreadPool
由于传进去的LinkedBlockingQueue是没有容量上限的,所以当请求数越来越多,并且无法及时处理完毕的时候,也就是请求堆积的时候,容易造成占用大量的内存,可能会导致OOM。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
newSingleThreadExecutor
可以看出,这里和刚刚的newFiexdThreadPool的原理基本一样,只不过把线程数直接设置成了1,所以这也会导致同样的问题,也就是当请求堆积的时候,可能会占用大量的内存。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newCachedThreadPool
可缓存的线程池,特点:无界线程池,具有自动回收多余线程的功能。使用SynchronousQueue,可缓存的,一定时间会把线程回收回去,默认60s。
这种线程池存在一定的弊端,这里的弊端在于,第二个参数maximumPoolSize被设置为Integer.MAX_VALUE,这可能会创建非常多的线程,甚至导致OOM。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newScheduledThreadPool
支持定时及周期性任务执行的线程池。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
2.3 如何正确创建线程池
正确创建线程池的方法,根据不同的业务场景,自己设置线程池参数,比如我们的内存有多大,我们想给线程取什么名字等等。
线程池里的线程数量设定为多少比较合适?
- CPU密集型(加密、计算hash等):最佳线程数为CPU核心数的1-2倍左右。
- 耗时IO型(读写数据库、文件、网络读写等):最佳线程数一般会大于cpu核心数很多倍,以JVM线程监控显示繁忙情况为依据,保证线程空闲可以衔接上,参考Brain Goetz推荐的计算方法:
- 线程数=CPU核心数 * (1+平均等待时间/平均工作时间)
3、常见线程池的特点对比
1)固定数量的线程池:FixedThreadPool
2)可缓存的线程池:CachedThreadPool
3)单一线程的线程池:SingleThreadPool
单线程的线程池:它只会用唯一的工作线程来执行任务,它的原理和FixedThreadPool是一样的,但是此时的线程数量被设置为了1.
4)定时任务类型的线程池:ScheduledThreadPool
4种线程池参数对比
阻塞队列分析
Q:FixedThreadPool和SingleThreadExecutor的Queue是LinkedBlockingQueue?
A:因为线程数已经固定了,只能通过无界队列来处理更多的任务。
Q:CachedThreadPool使用的Queue是SynchronousQueue?
A:不需要队列存储任务,直接交给新线程去执行。
Q:ScheduledThreadPool来说,它使用的是延迟队列DelayedWorkQueue
A:根据时间先后做延迟,符合它的应用场景。
WorkStealingPool是JDK1.8加入的。
这个线程池和之前的几种线程池都有很大的不同,不能保证执行顺序,其他线程会窃取子任务,如果使用递归,会有子任务产生,适用于这种情况,不过很少使用。
4、停止线程池的正确方法
1、shutdown 执行完正在执行的任务,并且执行完队列里的任务。就会停止,并且,执行完shutdown就不允许在向线程池中提交任务,会将其他任务拒绝。
2、isShutDown 是不是已经停止了,不是完全停止,是不是进入停止状态了。
3、isTerminated 返回是否线程池完全终止了。
4、awaitTermination 方法作用弱,等待一段时间判断线程池是否执行完毕,执行完毕返回true,没有执行完毕返回false.
5、shutdownNow 立刻把线程池关闭掉。用中断信号interrupt(优雅停止)。正在队列中等待的任务,会直接返回,这个方法有返回值,会返回一个列表。
5、线程池拒绝策略
5.1 拒绝时机
1、当Executor关闭时,提出拒绝新任务会被拒绝。
2、以及当Executor对最大线程和工作队列容量使用有边界并且已经饱和。
5.2 四种拒绝策略
- AbortPlicy: 直接抛出异常丢弃任务
- DiscardPolicy: 丢弃任务,默默丢弃,不会得到通知
- DiscardOldestPolicy: 丢弃最老的任务。
- CallerRunsPolicy: 让提交任务的线程去执行,这种策略比较聪明,两点好处,第一,任务无损失,第二(重要)降低提交速度,这是一种负反馈。这段时间主线程也会去执行完一些任务。
6、添加钩子方法
每个任务执行之前后,执行beforeExecute方法、afterExecute方法,来进行日志、统计等操作。
实例增加线程池暂停功能:
/**
* 演示每个任务执行前后放钩子函数,在执行任务前后做日志或者做统计可以通过这种方式做到这一点。
*/
public class PauseableThreadPool extends ThreadPoolExecutor {
/**
* 是不是已经暂停
*/
private boolean isPaused;
/**
* 为暂停标记添加锁
*/
private final ReentrantLock lock = new ReentrantLock();
/**
* 配合lock使用的Condition
*/
private Condition unpaused = lock.newCondition();
// 以下四个构造方法都是自动生成的
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
/**
* 任务执行之前的方法,这个方法就是线程池的钩子方法
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
lock.lock();
try {
// 判断是否暂停
while (isPaused) {
unpaused.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
}
/**
* 暂停线程池
*/
public void pause() {
lock.lock();
try {
isPaused = true;
} finally {
lock.unlock();
}
}
/**
* 恢复线程
*/
public void resume() {
lock.lock();
try {
isPaused = false;
// 唤醒全部的
unpaused.signalAll();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
PauseableThreadPool pauseableThreadPool =
new PauseableThreadPool(10, 20, 10l, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("我被执行");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// 放到线程池中执行
for (int i = 0; i < 10000; i++) {
pauseableThreadPool.execute(runnable);
}
Thread.sleep(1500);
// 暂停线程池
pauseableThreadPool.pause();
System.out.println("线程池被暂停了");
Thread.sleep(1500);
// 恢复线程池
pauseableThreadPool.resume();
System.out.println("线程池被恢复了");
}
}
7、线程池实现原理
7.1 线程池组成部分
- 线程池管理器
- 工作队列
- 任务队列
- 任务接口(Task)
任务队列要选择线程安全的BlockingQueue
7.2 Executor家族梳理
Executors是工具类。可去快速创建线程池。
创建的是ThreadPoolExecutor,返回的是ExecutorService,因为ExecutorService是ThreadPoolExecutor的父级接口。所以说ExecutorService也可以成为线程池。
7.3 线程池实现任务复用的原理
相同的线程执行不同的任务。下面是线程复用的核心方法。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// 如果小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 增加工作线程
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
随后进入addWorker方法。之后找到Worker类
w = new Worker(firstTask);
进入Worker类查看runWorker代码。在方法中把Runnable对象拿到,然后调用他的run方法,就可以实现相同的线程把,不同的人物run方法反复执行,而getTask就是从阻塞队列里取出来的,while循环就是整个worker不会停止,在循环中执行,执行完一个任务再执行下一个任务,并且相同的线程可以执行不同的任务。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 拿到Task
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循环中执行,去拿下一个
// 只要拿到的任务不为空,去执行
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 钩子方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
8、线程池状态和注意点
8.1 线程池状态
这五种状态在ThreadExecutorPool中都有定义。
- RUNNING:接收新任务并处理排队任务
- SHUTDOWN:不接受新任务,但是处理排队任务
- STOP:不接受新任务,也不处理排队任务,并中断正在进行的任务。
- TIDYING:中文是整洁,理解了中文就容易理解这个状态了:
所有任务都已终止,workerCount为零时,线程会转化到TIDYING状态,并将运行terminate()钩子方法。
- TERMINATED:terminate()运行完成
8.2 execute()方法分析
主要分三步处理:必须传入command,取到ctl获取到线程状态和线程数。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// 如果小于核心线程,则添加worker
if (workerCountOf(c) < corePoolSize) {
// 添加任务,true的话就是判断在增加线程时判断是否少于核心的数量
// 如果传入false就表示增加线程时,去判断是否少于最大线程数
if (addWorker(command, true))
return;
c = ctl.get();
}
// 检查线程池状态看是否被终止,如果运行中就向队列中放任务
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次做检查,如果不是运行状态,则删掉任务并拒绝
if (! isRunning(recheck) && remove(command))
reject(command);
// 线程如果是0,可能抛异常停止了,也需要创建一个新的线程,防止没有线程执行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 执行到这一步说明,队列也放不进去了,线程数也已经达到核心线程数了
// 增加到最大线程数如果增加到最大线程数,如果增加失败了,那么就执行拒绝的逻辑。
else if (!addWorker(command, false))
reject(command);
}
8.3 注意点
避免任务堆积
避免线程数过度增加。
排查线程泄露,可能因为任务结束不了。