一、Executor&ExecutorService简介
虽然大多数情况下,我们更喜欢将Executor或ExecutorService直接称之为“线程池”,但是事实上这两个接口只定义了任务(Runnable/Callable)被提交执行的相关接口。
public interface Executor {
void execute(Runnable command);
}
ExecutorService接口继承自Executor接口,并且提供了更多用于任务提交和管理的一些方法,比如停止任务的执行等
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
二、ThreadPoolExecutor详解
线程池主要解决了两个不同的问题:由于任务的异步提交,因此在执行大量的异步任务时可以提升系统性能;另外它还提供了限制和管理资源的方法,包括线程池中的工作线程、线程池任务队列中的任务,除此之外,每一个ThreadPoolExecutor还维护了一些基本的统计信息,比如已经完成的任务数量等。
public abstract class AbstractExecutorService implements ExecutorService {
...
}
2.1、ThreadPoolExecutor的主要方法
public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
...
}
2.1、ThreadPoolExecutor的简单使用
public static void main(String[] args) throws Exception{
ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(3,4,30,
TimeUnit.SECONDS,new ArrayBlockingQueue<>(4)
,Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy());
for (int i=0;i<7;i++){
threadPoolExecutor.execute(
()->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
);
}
System.out.println(threadPoolExecutor.getActiveCount());
System.out.println(threadPoolExecutor.getQueue().isEmpty());
threadPoolExecutor.shutdown();
}
2.2、构造参数详解
- corePoolSize:用于指定在线程池中维护的核心线程数量,即使当前线程池中的核心线程不工作,核心线程的数量也不会减少(在JDK1.6版本及以后可以通过设置允许核心线程超时的方法allowCoreThreadTimeOut来改变这种情况)。
- maximumPoolSize:用于设置线程池中允许的线程数量的最大值。
- keepAliveTime:当线程池中的线程数量超过核心线程数并且处于空闲时,线程池将回收一部分线程让出系统资源,该参数可用于设置超过corePoolSize数量的线程在多长时间后被回收,与unit配合使用。
- TimeUnit:用于设定keepAliveTime的时间单位。
- workQueue:用于存放已提交至线程池但未被执行的任务。
- ThreadFactory:用于创建线程的工厂,开发者可以通过自定义ThreadFactory来创建线程,比如,根据业务名为线程命名、设置线程优先级、设置线程是否为守护线程等、设置线程所属的线程组等。
- RejectedExecutionHandler:当任务数量超过阻塞队列边界时,这个时候线程池就会拒绝新增的任务,该参数主要用于设置拒绝策略。
ThreadPoolExecutor的构造比较复杂,除了其对每一个构造参数都有一定的要求之外(比如,不能为null),个别构造参数之间也存在一定的约束关系。
- TimeUnit、workQueue、ThreadFactory、RejectedExecutionHandler不能为null。
- corePoolSize可以设置为0,但不能小于0,并且corePoolSize不能大于线程的最大数量(maximumPoolSize)。
2.4、执行任务方法详解
程池被成功构造后,其内部的运行线程并不会立即被创建,ThreadPoolExecutor的核心线程将会采用一种Lazy(懒)的方式来创建并且运行,当线程池被创建,并且首次调用执行任务方法时才会创建,并且运行线程。
▪ 线程池核心线程数量大于0,并且首次提交任务时,线程池会立即创建线程执行该任务,并且该任务不会被存入任务队列之中。
▪ 当线程池中的活跃(工作)线程大于等于核心线程数量并且任务队列未满时,任务队列中的任务不会立即执行,而是等待工作线程空闲时轮询任务队列以获取任务。
▪ 当任务队列已满且工作线程小于最大线程数量时,线程池会创建线程执行任务,但是线程数量不会超过最大线程数,下面将上一段代码的最大循环数修改为N(最大线程数+任务队列size),会发现同时有maximumPoolSize个线程在工作。
▪ 当任务队列已满且线程池中的工作线程达到最大线程数量,并且此刻没有空闲的工作线程时,会执行任务拒绝策略,任务将以何种方式被拒绝完全取决于构造ThreadExecutorPool时指定的拒绝策略。若将执行任务的循环最大次数更改为15,再次执行时会发现只有14个任务被执行,第15个任务被丢弃(这里指定的拒绝策略为丢弃)。
▪ 若线程池中的线程是空闲的且空闲时间达到指定的keepAliveTime时间,线程会被线程池回收(最多保留corePoolSize数量个线程),当然如果设置允许线程池中的核心线程超时,那么线程池中所有的工作线程都会被回收。
2.5、ThreadFactory详解
在ThreadExecutorPool的构造参数中提供了一个接口ThreadFactory,用于定义线程池中的线程(Thread),我们可以通过该接口指定线程的命名规则、优先级、是否为daemon守护线程等信息
public interface ThreadFactory {
Thread newThread(Runnable r);
}
2.6、拒绝策略RejectedExecutionHandler
当线程池中没有空闲的工作线程,并且任务队列已满时,新的任务将被执行拒绝策略,在ThreadPoolExecutor中提供了4种形式的拒绝策略,当然它还允许开发者自定义拒绝策略。
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
- DiscardPolicy:丢弃策略,任务会被直接无视丢弃而等不到执行,因此该策略需要慎重使用。
- AbortPolicy:中止策略,在线程池中使用该策略,在无法受理任务时会抛出拒绝执行异常RejectedExecutionException(运行时异常)。
- DiscardOldestPolicy:丢弃任务队列中最老任务的策略(最早进入任务队列中的任务并不一定是最早(老)的,比如,优先级阻塞队列会根据排序规则来决定将哪个任务放在队头)。
- CallerRunsPolicy:调用者线程执行策略,前面的三种拒绝策略要么会在执行execute方法时抛出异常,要么会将任务丢弃。该策略不会导致新任务的丢失,但是任务会在当前线程中被阻塞地执行,也就是说任务不会由线程池中的工作线程执行。
2.7、ThreadPoolExecutor的其他方法
public class ThreadPoolExecutor extends AbstractExecutorService {
}
三、ScheduledExecutorService详解
ScheduledExecutorService继承自ExecutorService,并且提供了任务被定时执行的特性,我们可以使用ScheduledExecutorService的实现ScheduledThreadPoolExecutor来完成某些特殊的任务执行,比如使某任务根据设定的周期来运行,或者在某个指定的时间来执行任务等。
3.1、
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
/**
* 该方法是一个one-shot方法(只执行一次),
* 任务(callable)会在单位(unit)时间(delay)后被执行,
* 并且立即返回ScheduledFuture,
* 在稍后的程序中可以通过Future获取异步任务的执行结果。
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
/**
* 该方法同样是一个one-shot方法(只执行一次),
* 任务(runnable)会在单位(unit)时间(delay)后被执行,虽然也会返回ScheduledFuture,
* 但是并不会包含任何执行结果,因为Runnable接口的run方法本身就是无返回值类型的接口方法,
* 不过可以通过该Future判断任务是否执行结束。
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
/**
* 任务(command)会根据固定的速率(period,时间单位为unit)在时间(initialDelay,时间单位为unit)后不断地被执行。
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
//该方法与前一个方法比较类似,只不过该方法将以固定延迟单位时间的方式执行任务。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
}
案例一
public static void main(String[] args) throws Exception{
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor=new ScheduledThreadPoolExecutor(1);
ScheduledFuture<String> schedule = scheduledThreadPoolExecutor.schedule(() -> {
System.out.println("===");
return "123";
}, 5, TimeUnit.SECONDS);
System.out.println(schedule.get());
scheduledThreadPoolExecutor.shutdown();
}
案例二
public static void main(String[] args) throws Exception{
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor=new ScheduledThreadPoolExecutor(1);
ScheduledFuture<?> schedule = scheduledThreadPoolExecutor.schedule(() -> {
System.out.println("===");
}, 5, TimeUnit.SECONDS);
System.out.println(schedule.get());
scheduledThreadPoolExecutor.shutdown();
}
案例三:scheduleAtFixedRate和scheduleWithFixedDelay比较类似
public static void main(String[] args) throws Exception{
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor=new ScheduledThreadPoolExecutor(1);
/**相差2秒
* Sat Aug 28 16:08:38 CST 2021
* Sat Aug 28 16:08:40 CST 2021
* Sat Aug 28 16:08:42 CST 2021
* Sat Aug 28 16:08:44 CST 2021
* Sat Aug 28 16:08:46 CST 2021
* Sat Aug 28 16:08:48 CST 2021
*/
ScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new Date());
}, 5, 2, TimeUnit.SECONDS);
/** 相差4s
* Sat Aug 28 16:07:33 CST 2021
* Sat Aug 28 16:07:37 CST 2021
* Sat Aug 28 16:07:41 CST 2021
* Sat Aug 28 16:07:45 CST 2021
* Sat Aug 28 16:07:49 CST 2021
* Sat Aug 28 16:07:53 CST 2021
*/
ScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new Date());
}, 5, 2, TimeUnit.SECONDS);
}
四、关闭ExecutorService
如果ExecutorService在接下来的程序执行中将不再被使用,则需要将其关闭以释放工作线程所占用的系统资源,ExecutorService接口定义了几种不同形式的关闭方式
4.1、有序关闭(shutdown)
shutdown提供了一种有序关闭ExecutorService的方式,当该方法被执行后新的任务提交将会被拒绝,但是工作线程正在执行的任务以及线程池任务(阻塞)队列中已经被提交的任务还是会执行,当所有的提交任务都完成后线程池中的工作线程才会销毁进而达到ExecutorService最终被关闭的目的。
该方法是立即返回方法,它并不会阻塞等待所有的任务处理结束及ExecutorService最终的关闭,因此如果你想要确保线程池彻底被关闭之后才进行下一步的操作,那么这里可以配合另外一个等待方法awaitTermination使当前线程进入阻塞等待ExecutorService关闭结束后再进行下一步的动作。
4.2、立即关闭(shutdownNow)
shutdownNow方法首先会将线程池状态修改为shutdown状态,然后将未被执行的任务挂起并从任务队列中排干,其次会尝试中断正在进行任务处理的工作线程,最后返回未被执行的任务,当然,对一个执行了shutdownNow的线程池提交新的任务同样会被拒绝。
4.3、组合关闭(shutdown&shutdownNow)
五、Executors详解
要创建一个ExecutorService,尤其是ThreadPoolExecutor是比较复杂的,Java并发包中提供了类似于工厂方法的类,用于创建不同的ExecutorService,当然还包括拒绝策略、ThreadFactory等