1 线程池简介
为什么要使用线程池(Thread Pool)?
- 构造一个新的线程开销比较大,因为这涉及与操作系统的交互;
- 当出现高并发时,如果为每个并发请求都创建一个线程,甚至可能导致内存溢出。
- 因此,在大多数并发框架中都会使用线程池来管理线程
使用线程池管理线程主要有如下好处
- 降低资源消耗
通过复用已存在的线程和降低线程关闭的次数来尽可能降低系统性能损耗;
- 提升系统响应速度
通过复用线程,省去创建线程的过程,因此整体上提升了系统的响应速度;
- 提高线程的可管理性
线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此,需要使用线程池来管理线程。
- 线程池的工作过程
- 线程池中包含许多准备运行的线程
- 为线程池提供一个Runnable,就会有一个线程调用其
**run()**
方法 - 当
**run()**
方法退出时,这个线程不会死亡,而是留在池中准备为下一个请求提供服务
2 自定义线程池
在了解Java提供的线程池时,首先了解一下自定义线程池,有助于后续的理解
自定义线程池结构
- 自定义线程池是一个生产者消费者模型,其中Thread Pool是消费者,而任务调用者是生产者
- 需要一个阻塞队列来平衡平衡生产者与消费者速率的差异
- 生产者不断将新的任务放到阻塞队列中,消费者则不断从阻塞队列中取出任务并处理
- 自定义线程池代码
```java
//拒绝策略
@FunctionalInterface
interface RejectPolicy
{ void reject(BlockingQueue queue, T task); }
@Slf4j(topic = “c.ThreadPool”)
class ThreadPool {
//任务队列
private final BlockingQueue
//生产者:线程集合
private final HashSet<Worker> workers = new HashSet<>();
//核心线程数,即线程池中最大线程数量
private final int coreSize;
//任务的超时时间
private final long timeout;
private final TimeUnit timeUnit;
private final RejectPolicy<Runnable> rejectPolicy;
public ThreadPool(int coreSize, int queueCapacity, long timeout, TimeUnit timeUnit, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}
//因为workers是共享变量,且不是线程安全的,因此加锁保护
public void execute(Runnable task) {
synchronized (workers) {
//当线程池中的线程数量没有超过coreSize时,创建一个线程去执行任务
//否则任务加入阻塞队列暂存
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
log.debug("新增 worker{}", worker);
workers.add(worker);
worker.start();
} else {
taskQueue.tryPut(rejectPolicy, task);
}
}
}
//内部类,表示线程池中的线程
class Worker extends Thread {
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
//当task不为空,执行任务
//当task执行完毕,尝试从任务队列中获取任务继续执行
while (task != null || (task = taskQueue.take(timeout, timeUnit)) != null) {
try {
log.debug("正在执行...{}", task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
//已经没有任务了,移除当前线程
synchronized (workers) {
log.debug("worker 被移除{}", this);
workers.remove(this);
}
}
}
}
@Slf4j(topic = “c.BlockingQueue”)
class BlockingQueue
//锁
private final ReentrantLock lock = new ReentrantLock();
//生产者条件变量
private final Condition fullWaitSet = lock.newCondition();
//消费者条件变量
private final Condition emptyWaitSet = lock.newCondition();
//阻塞队列最大容量
private final int capacity;
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
//根据timeout的值判断是超时获取还是获取
public T take(long timeout, TimeUnit unit) {
//-1表示当没有任务时,线程会进入WAITING状态持续等待
if (timeout == -1)
//获取任务
return take();
//超时获取任务
return timed_take(timeout, unit);
}
//获取任务,如果没有任务则线程等待
private T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
//带超时的获取任务,如果没有任务则线程撤销
private T timed_take(long timeout, TimeUnit unit) {
lock.lock();
try {
//将timeout统一转换为纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
//超时了,返回null并退出
if (nanos <= 0) {
return null;
}
//awaitNanos返回的是剩余等待时间
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
//添加元素
public void put(T element) {
lock.lock();
try {
while (queue.size() == capacity) {
try {
log.debug("等待加入任务队列 {} ...", element);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", element);
queue.addFirst(element);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
public boolean timed_put(T element, long timeout, TimeUnit unit) {
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while (queue.size() == capacity) {
try {
log.debug("等待加入任务队列 {} ...", element);
if (nanos <= 0)
return false;
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", element);
queue.addFirst(element);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
return true;
}
public void tryPut(RejectPolicy<T> rejectPolicy, T element) {
lock.lock();
try {
//判断队列是否满
if (queue.size() == capacity) {
//当任务队列满时,将队列和任务交给rejectPolicy处理
rejectPolicy.reject(this, element);
} else {
log.debug("加入任务队列 {}", element);
queue.addFirst(element);
emptyWaitSet.signal();
}
} finally {
lock.unlock();
}
}
//获取阻塞队列当前大小
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
}
- **测试**
任务队列容量为10,线程池容量为2,需要执行15个任务,拒绝策略为超时等待
```java
@Slf4j(topic = "c.TestPool")
public class TestPool {
public static void main(String[] args) {
//创建线程池并指定线程池容量、任务队列容量、任务队列空时线程的等待时间、任务队列满时的拒绝策略
ThreadPool threadPool = new ThreadPool(2, 10,
3000, TimeUnit.MILLISECONDS, (queue, task) -> {
//1. 死等
//queue.put(task);
//2. 超时等待
if (!queue.timed_put(task, 1000, TimeUnit.MILLISECONDS))
log.debug("等待超时,放弃{}", task);
//3. 调用者放弃任务执行
//log.debug("放弃{}", task);
//4. 调用者抛出异常
//throw new RuntimeException("任务执行失败" + task);
//5. 调用者自己执行任务
//task.run();
});
//创建15个任务
for (int i = 0; i < 15; i++) {
int temp = i;
threadPool.execute(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("{}", temp);
});
}
}
}
- 上述代码输出
16:28:11 [main] c.ThreadPool - 新增 workerThread[Thread-0,5,main]
16:28:11 [main] c.ThreadPool - 新增 workerThread[Thread-1,5,main]
16:28:11 [Thread-0] c.ThreadPool - 正在执行...TestPool$$Lambda$31/0x00000008000cb840@4ae63b4b
16:28:11 [Thread-1] c.ThreadPool - 正在执行...TestPool$$Lambda$31/0x00000008000cb840@577d4f43
16:28:11 [main] c.BlockingQueue - 加入任务队列 TestPool$$Lambda$31/0x00000008000cb840@79efed2d
16:28:11 [main] c.BlockingQueue - 加入任务队列 TestPool$$Lambda$31/0x00000008000cb840@27ae2fd0
16:28:11 [main] c.BlockingQueue - 加入任务队列 TestPool$$Lambda$31/0x00000008000cb840@29176cc1
16:28:11 [main] c.BlockingQueue - 加入任务队列 TestPool$$Lambda$31/0x00000008000cb840@2f177a4b
16:28:11 [main] c.BlockingQueue - 加入任务队列 TestPool$$Lambda$31/0x00000008000cb840@4278a03f
16:28:11 [main] c.BlockingQueue - 加入任务队列 TestPool$$Lambda$31/0x00000008000cb840@147ed70f
16:28:11 [main] c.BlockingQueue - 加入任务队列 TestPool$$Lambda$31/0x00000008000cb840@61dd025
16:28:11 [main] c.BlockingQueue - 加入任务队列 TestPool$$Lambda$31/0x00000008000cb840@124c278f
16:28:11 [main] c.BlockingQueue - 加入任务队列 TestPool$$Lambda$31/0x00000008000cb840@15b204a1
16:28:11 [main] c.BlockingQueue - 加入任务队列 TestPool$$Lambda$31/0x00000008000cb840@77167fb7
16:28:11 [main] c.BlockingQueue - 等待加入任务队列 TestPool$$Lambda$31/0x00000008000cb840@1fe20588 ...
16:28:12 [main] c.BlockingQueue - 等待加入任务队列 TestPool$$Lambda$31/0x00000008000cb840@1fe20588 ...
16:28:12 [main] c.TestPool - 等待超时,放弃TestPool$$Lambda$31/0x00000008000cb840@1fe20588
16:28:12 [main] c.BlockingQueue - 等待加入任务队列 TestPool$$Lambda$31/0x00000008000cb840@6973bf95 ...
16:28:13 [main] c.BlockingQueue - 等待加入任务队列 TestPool$$Lambda$31/0x00000008000cb840@6973bf95 ...
16:28:13 [main] c.TestPool - 等待超时,放弃TestPool$$Lambda$31/0x00000008000cb840@6973bf95
16:28:13 [main] c.BlockingQueue - 等待加入任务队列 TestPool$$Lambda$31/0x00000008000cb840@2ddc8ecb ...
16:28:14 [main] c.BlockingQueue - 等待加入任务队列 TestPool$$Lambda$31/0x00000008000cb840@2ddc8ecb ...
16:28:14 [main] c.TestPool - 等待超时,放弃TestPool$$Lambda$31/0x00000008000cb840@2ddc8ecb
16:28:16 [Thread-1] c.TestPool - 1
16:28:16 [Thread-0] c.TestPool - 0
16:28:16 [Thread-1] c.ThreadPool - 正在执行...TestPool$$Lambda$31/0x00000008000cb840@77167fb7
16:28:16 [Thread-0] c.ThreadPool - 正在执行...TestPool$$Lambda$31/0x00000008000cb840@15b204a1
16:28:21 [Thread-1] c.TestPool - 11
16:28:21 [Thread-0] c.TestPool - 10
16:28:21 [Thread-0] c.ThreadPool - 正在执行...TestPool$$Lambda$31/0x00000008000cb840@61dd025
16:28:21 [Thread-1] c.ThreadPool - 正在执行...TestPool$$Lambda$31/0x00000008000cb840@124c278f
16:28:26 [Thread-0] c.TestPool - 8
16:28:26 [Thread-1] c.TestPool - 9
16:28:26 [Thread-0] c.ThreadPool - 正在执行...TestPool$$Lambda$31/0x00000008000cb840@147ed70f
16:28:26 [Thread-1] c.ThreadPool - 正在执行...TestPool$$Lambda$31/0x00000008000cb840@4278a03f
16:28:31 [Thread-1] c.TestPool - 6
16:28:31 [Thread-0] c.TestPool - 7
16:28:31 [Thread-1] c.ThreadPool - 正在执行...TestPool$$Lambda$31/0x00000008000cb840@2f177a4b
16:28:31 [Thread-0] c.ThreadPool - 正在执行...TestPool$$Lambda$31/0x00000008000cb840@29176cc1
16:28:36 [Thread-1] c.TestPool - 5
16:28:36 [Thread-0] c.TestPool - 4
16:28:36 [Thread-1] c.ThreadPool - 正在执行...TestPool$$Lambda$31/0x00000008000cb840@27ae2fd0
16:28:36 [Thread-0] c.ThreadPool - 正在执行...TestPool$$Lambda$31/0x00000008000cb840@79efed2d
16:28:42 [Thread-1] c.TestPool - 3
16:28:42 [Thread-0] c.TestPool - 2
16:28:45 [Thread-0] c.ThreadPool - worker 被移除Thread[Thread-0,5,main]
16:28:45 [Thread-1] c.ThreadPool - worker 被移除Thread[Thread-1,5,main]
3 ThreadPoolExecutor类
- ThreadPoolExecutor概述
ThreadPoolExecutor类是JDK提供的线程池实现,位于JUC包
1 线程池状态
- 状态表示
- ThreadPoolExecutor使用int的高3位来表示线程池状态,低29位表示线程数量
- 这样表示的目的是将线程池状态与线程个数合二为一,这样就可以用一次CAS原子操作来改变线程池状态
- 上述信息存储在一个原子变量ctl中 ```java //ctl存储线程状态和线程个数 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//rs为int高3位,代表线程池状态;wc为int低29位,代表线程格式;该方法用于合并rs和wc private static int ctlOf(int rs, int wc) { return rs | wc; }
- **线程池状态表**
| **状态名** | **高3位** | **接收新任务** | **处理阻塞队列任务** | **说明** |
| --- | --- | --- | --- | --- |
| **RUNNING** | 111 | Y | Y | 线程池新创建时的状态,可以接受新任务,也可以处理阻塞队列任务 |
| **SHUTDOWN** | 000 | N | Y | 不会接收新任务,但会处理阻塞队列剩余任务 |
| **STOP** | 001 | N | N | 会中断正在执行的任务,并抛弃阻塞队列中的任务 |
| **TIDYING** | 010 | <br /> | <br /> | 任务全部执行完毕,没有活动线程,即将进入终结 |
| **TERMINATED** | 011 | <br /> | <br /> | 终结 |
<br />
<a name="QmOra"></a>
## 2 线程池创建(参数与行为)
- 从自定义线程池一节可以了解到,线程池的创建需要配置很多参数,而通过ThreadPoolExecutor类创建线程池也是一样的,其有许多重载的构造方法
- 以参数最多的构造方法来理解创建线程池的**7个参数**
```java
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
可以发现ThreadPoolExecutor的参数配置十分类似于自定义线程池的参数配置,具体的参数配置含义如下**int corePoolSize**
- 表示核心线程池的大小
- 线程池中的线程分为两类,核心线程和救急线程,这里指定的是核心线程的数量
- 当提交一个任务时,如果当前核心线程的个数没有达到corePoolSize,即使当前核心线程池有空闲的线程,也会创建新的核心线程来执行所提交的任务。如果当前核心线程池的线程个数已经达到了corePoolSize,则不再重新创建线程。
- 当线程数达到corePoolSize并且没有线程空闲,这时新加入的任务会被加入阻塞队列,直到有空闲的线程
- 当核心线程执行完任务后,也不会被销毁
- 核心线程默认“懒启动”,即线程池创建时没有线程
如果调用了prestartCoreThread()
或者prestartAllCoreThreads()
,线程池创建的时候所有的核心线程都会被创建并且启动
**int maximumPoolSize**
- 表示线程池能创建线程的最大个数,超过corePoolSize的线程称为救急线程
- 当选择了有界阻塞队列且阻塞队列已满,并且当前线程池线程个数没有超过maximumPoolSize,就会创建救急线程来执行任务
- 救急线程不像核心线程一样创建后就在线程池中长期存在,而是会在无事可做后等待一定时间,最后被销毁
**long keepAliveTime**
- 表示救急线程存活的时间,这里的存活时间指的是当线程无事可做时等待新任务的时间
- 如果当前线程池的线程个数已经超过了corePoolSize,并且线程空闲时间超过了keepAliveTime的话,就会将这些救急线程销毁,这样可以尽可能降低系统资源消耗。
**TimeUnit unit**
- 表示时间单位,为keepAliveTime指定时间单位。
**BlockingQueue<Runnable> workQueue**
- 表示阻塞队列
- BlockingQueue是一个接口,JDK提供了7种实现类,常用的4种如下
- ArrayBlockingQueue
ArrayBlockingQueue是用数组实现的有界阻塞队列,其容量在创建后不可改变
队列中的元素执行先进先出策略
- **LinkedBlockingQueue**
LinkedBlockingQueue是用链表实现的有界阻塞队列,其内元素执行先进先出策略
- **SynchronousQueue**
SynchronousQueue每个插入任务操作前必须等待线程池中的线程准备来阻塞队列中取任务
因此,SynchronousQueue实际上没有存储任何数据元素,换句话来说生产者与消费者直接交易
- **PriorityBlockingQueue**
支持优先级排序的阻塞队列
**ThreadFactory threadFactory**
- 表示创建线程的线程工厂
- ThreadFactory是一个接口,其内只有一个方法
Thread newThread(Runnable r)
实现ThreadFactory接口主要是为每个创建出来的线程设置更有意义的名字
默认调用Executors类中的静态工厂方法
_**defaultThreadFactory**_**()**
来创建一个默认的线程工厂public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
//Executors类中的静态工厂方法
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
//默认线程工厂
private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
**RejectedExecutionHandler handler**
- 表示拒绝策略
- 当线程池的阻塞队列已满并且所有线程都已开启(当前线程数量达到
maximumPoolSize
),说明当前线程池已经处于饱和状态了,那么就需要采用一种拒绝策略来处理所提交的任务 - RejectedExecutionHandler是一个接口,JDK提供了4种实现类(拒绝策略),如下
- AbortPolicy
直接拒绝所提交的任务,并抛出RejectedExecutionException异常,这是默认拒绝策略
- **CallerRunsPolicy**
利用调用者所在的线程来执行任务;
- **DiscardPolicy**
不处理直接丢弃掉任务
- **DiscardOldestPolicy**
丢弃掉阻塞队列中存放时间最久的任务,执行当前任务
这些实现类都是ThreadPoolExecutor下的内部静态类,也可以根据自己的需要实现自己的拒绝策略,只需要实现RejectedExecutionHandler接口即可
Executors类
- Executors类概述
- ThreadPoolExecutor类的构造方法参数众多,比较难掌握,因此JDK在Executors类中提供了众多工厂方法来方便的创建线程池
- Executors类的工程方法内部实际上调用的仍然是ThreadPoolExecutor类的构造方法,只不过简化了参数的配置
**ExecutorService newFixedThreadPool(int nThreads)**
源码
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
特点
- 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
- 适用于任务量已知,相对耗时的任务
**ExecutorService newCachedThreadPool()**
源码
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
特点
- 核心线程数是0, 最大线程数是Integer.MAX_VALUE,救急线程的空闲生存时间是60s,意味着线程池中的线程全部都是救急线程且60s后可以回收
- 整个线程池表现为线程数会根据任务量不断增长,没有上限;当任务执行完毕,空闲1分钟后释放线程。
- 适合任务数比较密集,但每个任务执行时间较短的情况
**ExecutorService newSingleThreadExecutor()**
源码
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
特点
希望多个任务排队执行。线程数固定为1,任务数多于1时,会放入阻塞队列排队。任务执行完毕,这唯一的线程也不会被释放
- 自己创建一个线程与该线程池的区别
- 自己创建一个单线程串行执行任务,如果任务执行失败那么线程将会结束而剩余任务无法完成
- 使用该线程池时如果其内线程意外终止,线程池还会新建一个线程,保证池的正常工作
3 Callable接口与Future接口
Callable和Future是位于JUC包下的接口
Callable接口概述
- Runnable接口用于封装线程所执行的任务,其内提供一个没有参数和返回值的方法
run()
- Callable接口也是用于封装线程所执行的任务,其内提供一个没有参数但是有返回值的方法
call()
- Callable接口源码如下
- Runnable接口用于封装线程所执行的任务,其内提供一个没有参数和返回值的方法
Callable接口的泛型参数是返回值的类型,如**Callable<Integer>**
表示一个返回Integer对象的任务
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
- Future接口概述
- Future用于保存异步计算的结果
- 一般来说,当我们执行一个长时间运行的任务时,使用Future就可以让我们暂时去处理其他的任务,等长任务执行完毕再返回其结果
- Future接口源码如下
Future接口的泛型参数是get()
方法返回值的类型
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
执行Callable的方式
- 可以使用FutureTask类来包裹一个Callable,由于FutureTask类实现了Future接口和Runnable接口,因此可以将FutureTask传给一个线程来执行Callable的
**call()**
方法
FutureTask相关源码
public class FutureTask<V> implements Runnable, Future<V> {
private Callable<V> callable;
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
// 简略版
public void run() {
...
Callable<V> c = callable;
V result;
result = c.call(); // 执行Callable的call方法
...
}
}
- 更常见的是直接将Callable传给线程池的方法,由线程池来执行任务
- 可以使用FutureTask类来包裹一个Callable,由于FutureTask类实现了Future接口和Runnable接口,因此可以将FutureTask传给一个线程来执行Callable的
Future接口API
Future的方法都是实例方法
**V future.get() throws InterruptedException, ExecutionException**
**V future.get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException**
该方法会阻塞当前线程,使当前线程进入WAITING状态或TIMED_WAITING状态,直到结果可用或超过了指定的时间,类似于保护性暂停模式
- 如果超过了指定的时间,将抛出TimeoutException异常
**boolean future.cancel(boolean mayInterruptIfRunning)**
尝试取消任务的运行
- 当任务还没有开始,则取消任务且不再开始
- 如果任务已经开始且参数mayInterruptIfRunning为true,则中断任务
**boolean future.isDone()**
如果任务结束(无论是正常完成、中途取消还是发生异常),都返回true
4 线程池提交任务相关API
所有API都是实例方法
**void pool.execute(Runnable command)**
- 向线程池提交任务并由线程池执行
**Future<?> pool.submit(Runnable task)**
**<T> Future<T> pool.submit(Callable<T> task)**
与execute()
方法功能类似,不同的是
**submit()**
方法既可以接收**Runnable**
类型也可以接收**Callable<T>**
类型,**execute()**
方法没有返回值,而**submit()**
方法会返回一个Future对象,通过它可以判断任务是否执行成功- 获取任务执行结果时可以调用
Future::get()
,该方法会阻塞当前线程直到任务完成 实例 ```java @Slf4j() public class Test { public static void main(String[] args) throws Exception { ExecutorService pool = Executors.newFixedThreadPool(3);
Future
future = pool.submit(new Callable () { @Override
public String call() throws Exception {
log.debug("running");
Thread.sleep(1000);
return "ok";
}
});
log.debug(“{}”, future.get()); } }
由于Future类的`get()`方法内部有保护性暂停策略,因此上述主线程打印时一定在子线程执行完毕后
- `**<T> List<Future<T>> pool.invokeAll(Collection<? extends Callable<T>> tasks)**`
- `**<T> List<Future<T>> pool.invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)**`
- 向线程池提交**tasks中所有任务**
- 可以选择向该方法传递超时参数,当执行超时时,线程池将取消执行tasks中剩余的任务
- `**<T> T pool.invokeAny(Collection<? extends Callable<T>> tasks)**`
- 提交tasks中所有任务,**哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消**
<a name="mO3fT"></a>
## 5 关闭线程池相关API
- 所有API都是**实例方法**
- `**void pool.shutdown()**`
- 将线程池状态切换为**SHUTDOWN**
- 此时不会接收新任务,但**已提交的任务会执行完**
- `**List<Runnable> pool.shutdownNow()**`
- 将线程池状态切换为**STOP**
- 此时不接收新任务,**中断正在执行的任务,返回阻塞队列中任务**
<a name="pq7bh"></a>
# 4 Fork-Join池
- **Fork-Join池概述**
- Fork-Join是JDK7加入的**新的线程池实现**
- Fork-Join体现的是一种**分治思想**,**适用于能够进行任务拆分的CPU密集型运算,I/O密集型任务仍然需要使用传统的线程池**
- 任务拆分是指将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。
跟**递归相关的一些计算**,如归并排序、斐波那契数列、都可以用分治思想进行求解<br />换句话说,**Fork-Join池可以自动地以并行的方式处理递归任务**
- **Fork-Join在分治的基础上加入了多线程**,可以**把每个任务的分解和合并交给不同的线程来完成**,进一步提升了运算效率
<br />
- **Fork-Join池的创建**
- Fork-Join池在JDK中的实现是**ForkJoinPool类**,其位于JUC包下
- ForkJoinPool的创建类似于ThreadPoolExecutor,同样有很多参数和重载的构造方法
- Fork-Join池默认会创建与**CPU核心数大小相同的线程池
**
- **Fork-Join池的任务的创建**
- 提交给Fork-Join池的任务,需要**继承**`**RecursiveTask<T>**`**类(任务有返回值)**或`**RecursiveAction**`**类(任务无返回值)**
- 具体执行的任务逻辑在上述两个类的`compute()`方法中,因此需要重写该方法
- `compute()`中的任务逻辑类似于递归任务的逻辑,稍有不同
- **Fork-Join池的使用**
**实例**
- 统计一个数组中有多少个元素满足大于0
- 我们可以使用二分的思想,将数组一分为二,分别对这两部分进行统计,再将结果相加
```java
public class Test {
public static void main(String[] args) throws Exception {
int[] a = {-1, 1, -1, 1, -1, 1, -1, 1, -1, 1};
Counter counter = new Counter(a, 0, a.length);
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(counter);
System.out.println(counter.join());
}
}
class Counter extends RecursiveTask<Integer> {
public static final int THRESHOLD = 3; //任务拆分阈值
private final int[] values;
private final int from;
private final int to;
public Counter(int[] values, int from, int to) {
this.values = values;
this.from = from;
this.to = to;
}
@Override
protected Integer compute() {
if (to - from < THRESHOLD) {
//不再拆分,直接解决问题
int count = 0;
for (int i = from; i < to; i++) {
if (values[i] > 0) count++;
}
return count;
} else {
int mid = (from + to) / 2;
Counter first = new Counter(values, from, mid);
Counter second = new Counter(values, mid, to);
first.fork(); // 让一个线程执行该任务
second.fork();
return first.join() + second.join(); //合并结果
}
}
}