Java 语言虽然内置了多线程支持,启动一个新线程非常方便,但是,创建线程需要操作系统资源(线程资源,栈空间等),频繁创建和销毁大量线程需要消耗大量时间。
如果可以复用一组线程:
┌─────┐ execute ┌──────────────────┐│Task1│─────────>│ThreadPool │├─────┤ │┌───────┐┌───────┐││Task2│ ││Thread1││Thread2││├─────┤ │└───────┘└───────┘││Task3│ │┌───────┐┌───────┐│├─────┤ ││Thread3││Thread4│││Task4│ │└───────┘└───────┘│├─────┤ └──────────────────┘│Task5│├─────┤│Task6│└─────┘...
就可以把很多小任务让一组线程来执行,而不是一个任务对应一个新线程。这种能接收大量小任务并进行分发处理的就是线程池。
线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待状态。如果有新任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,新任务要么放入队列等待,要么增加一个新线程进行处理。
Java 标准库提供了 ExecutorService 接口表示线程池,它的用法如下:
// 创建固定大小的线程池:ExecutorService executor = Executors.newFixedThreadPool(3);// 提交任务:executor.submit(task1);executor.submit(task2);executor.submit(task3);executor.submit(task4);executor.submit(task5);
ExecutorService 的实现类有:
- FixedThreadPool:线程数固定的线程池;
- CachedThreadPool:线程数根据任务动态调整的线程池,线程空闲 60 秒之后终止线程;
- SingleThreadExecutor:仅单线程执行的线程池。
创建这些线程池的方法都被封装到Executors这个类中(类似工厂模式)。
以 FixedThreadPool 为例,看看线程池的执行逻辑:
import java.util.concurrent.*;public class Main {public static void main(String[] args) {// 创建一个固定大小的线程池:ExecutorService es = Executors.newFixedThreadPool(4);for (int i = 0; i < 6; i++) {es.submit(new Task("" + i));}// 关闭线程池:es.shutdown();}}class Task implements Runnable {private final String name;public Task(String name) {this.name = name;}@Overridepublic void run() {System.out.println("start task " + name);try {Thread.sleep(1000);} catch (InterruptedException e) {}System.out.println("end task " + name);}}
output: (输出情况有一定的随机性)
start task 0start task 1start task 2start task 3end task 0start task 4end task 2start task 5end task 1end task 3end task 4end task 5
观察执行结果,一次性放入 6 个任务,由于线程池只有固定的 4 个线程,因此,前 4 个任务会同时执行,等到有线程空闲后,才会执行后面的两个任务。根据上面的结构也可以看出,结束 task 0 后,就立即开始了 task 4 的任务执行(当然,这有一定的随机性)。
线程池在程序结束的时候要关闭:
shutdown(): 等待正在执行的任务先完成,然后再关闭。shutdownNow(): 立刻停止正在执行的任务。awaitTermination(): 等待指定的时间让线程池关闭。
把创建线程池改为 CachedThreadPool,由于这个线程池的实现会根据任务数量动态调整线程池的大小,所以 6 个任务可一次性全部同时执行。
可以查看创建 CachedThreadPool 的 Executors.newCachedThreadPool() 源码:
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}
如果想把线程池的大小限制在 4~10 个之间动态调整,可以这样:
int min = 4;int max = 10;ExecutorService es = new ThreadPoolExecutor(min, max,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
ScheduledThreadPool
有一种任务,需要定期反复执行,例如,每秒刷新证券价格。这种任务本身固定,需要反复执行的,可以使用 ScheduledThreadPool。放入 ScheduledThreadPool 的任务可以定期反复执行。
创建一个 ScheduledThreadPool 仍然是通过 Executors 类:
ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);
我们可以提交一次性任务,它会在指定延迟后只执行一次:
// 1 秒后执行一次性任务:ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS);
如果任务以固定的每3秒执行,我们可以这样写:
// 2 秒后开始执行定时任务,每3秒执行:ses.scheduleAtFixedRate(new Task("fixed-rate"), 2, 3, TimeUnit.SECONDS);
如果任务以固定的3秒为间隔执行,我们可以这样写:
// 2秒后开始执行定时任务,以3秒为间隔执行:ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS);
FixedRate 是指任务总是以固定时间间隔触发,不管任务执行多长时间:
│░░░░ │░░░░░░ │░░░ │░░░░░ │░░░├───────┼───────┼───────┼───────┼────>│<─────>│<─────>│<─────>│<─────>│
FixedDelay 是指,上一次任务执行完毕后,等待固定的时间间隔,再执行下一次任务:
│░░░│ │░░░░░│ │░░│ │░└───┼───────┼─────┼───────┼──┼───────┼──>│<─────>│ │<─────>│ │<─────>│
使用 ScheduledThreadPool 时,我们要根据需要选择执行一次、FixedRate 执行还是 FixedDelay 执行。
思考下面的问题:
- 在 FixedRate 模式下,假设每秒触发,如果某次任务执行时间超过 1 秒,后续任务会不会并发执行?
如果此任务的任何执行时间超过其周期,则后续执行可能会延迟开始,但不会并发执行。
个人理解:这里的并发是指,到达指定定时时间之后就能执行下一个任务,但是在 FixedRate 模式下,必须等待任务执行完成(即使当前任务已经超过了等待时间),才能执行下一个任务。所以这里不支持并发执行。
- 如果任务抛出了异常,后续任务是否继续执行?
如果任务的任何执行遇到异常,则将禁止后续任务的执行。
Java 标准库还提供了一个 java.util.Timer 类,这个类也可以定期执行任务,但是,一个 Timer 会对应一个Thread,所以,一个 Timer 只能定期执行一个任务,多个定时任务必须启动多个 Timer,而一个 ScheduledThreadPool 就可以调度多个定时任务,所以,我们完全可以用 ScheduledThreadPool 取代旧的 Timer。
小结
JDK 提供了 ExecutorService 实现了线程池功能:
- 线程池内部维护一组线程,可以高效执行大量小任务;
Executors提供了静态方法创建不同类型的ExecutorService;- 必须调用
shutdown()关闭ExecutorService; ScheduledThreadPool可以定期调度多个任务。
