应用线程池
本章重点:
- 配置和调整线程池时使用的高级选项
- 使用任务执行框架的过程中需要注意的危险
- 提供了一些使用Executor的高级例子。
7.1任务与执行策略间的隐性耦合
Executor框架可以将任务的提交与执行策略进行解耦。但并非所有的任务都适合所有的执行策略。有很多类型的任务需要明确的指出一个执行策略:
依赖性任务。
- 一个好的任务是独立的,它们不依赖于时序,或者其他任务的结果与边界效应。但是,如果你提交给线程池的任务要依赖于其他的任务,你就隐式地给执行策略带来了约束,必须仔细地管理执行策略以避免活跃度问题。
采用线程限制的任务
- 单线程化的Executor相比于任意的线程池,可以对同步作出更强的承诺,它可以保证任务不会并发的执行,可以把对象限制在任务线程中,任务线程在访问该对象时,不需要同步。但是如果你将Executor从一个单线程化的改为一个线程池的话,就会失去安全性。
对响应事件敏感的任务
- 如果将一个长时间运行的任务提交到单线程化的Executor中,或者将多个长时间运行的任务提交给一个只包含少量线程的线程池中,会削弱Executor管理的服务的响应性。
如果将耗时的与短期的任务混合在一起,除非线程池够大,否则就会有“塞车”的风险。如果提交的任务要依赖于其他的任务,除非线程池是无限的,否则就有产生死锁的风险。
一些任务具有这样的特征:
- 需要或者排斥某种特定的执行策略。
- 对其他任务具有依赖性的任务,就会要求线程池足够大,来保证它所依赖的任务不必排队或者不被拒绝。
- 采用线程限制的任务需要顺序地执行。
7.1.1线程饥饿死锁
线程饥饿死锁:
在一个大的线程池中,如果所有线程执行的任务都阻塞在线程池中,等待着仍然处于同一工作队列中的其他任务。只要池任务开始了**无限期的阻塞**,其**目的是等待一些资源或条件**,此时**只有另一个池任务的活动才能使那些条件成立**,比如等待返回值或者另一个任务的边界效应。
无论何时,当提交了一个非独立的Executor任务时,要明确出现线程饥饿死锁的可能性。
7.1.2耗时操作
如果任务由于过长的时间周期而阻塞,那么即时不可能出现死锁,线程池的响应性也会变得很差。
这时,就可以限定任务等待资源的时间,不要无限制地等待下去。
大多数平台类库中的阻塞方法,都同时有限时和非限时两个版本:
比如Thread.join、BlockingQueue.put、CountDownLatch.await、Selector.select。
如果等待超时,你可以把任务标识为失败,终止它或者把它重新放回队列。
7.2定制线程池的大小
线程池合理的长度取决于未来提交的任务类型和所部署系统的特征。
池的长度应该由某种配置机制来提供,或者利用Runtime.availableProcessors的结果(得到的是CPU的数目)。动态地进行计算。
要做的仅仅是避免“过大”和“过小“。
7.3配置ThreadPoolExecutor
ThreadPoolExecutor为一些Executor提供了基本的实现,这些Executor是由Executors中的工厂newCachedThreadPool…….等方法返回的。
如果默认的执行策略不能满足需求,可以通过构造函数实例化一个THread’PoolExecutor,自己定制它。(具体构造函数参照源码)
构造函数之一:
public ThreadPoolExecutor(int corePollSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactoryRejectedExecutionHandler handler){.....}
7.3.1线程的创建与销毁
核心池大小、最大池大小、存活时间 共同管理者线程的创建与销毁。
线程池的实现试图维护池的大小:即使没有任务执行。
核心池大小:也等于池的大小,直到工作队列充满前,池都不会创建更多的线程。
最大池大小:可同时活动的线程数的上限。
存活时间:如果一个线程已经闲置的时间超过了存活时间,它将成为一个被回收的候选者,如果当前池的大小超过了核心池的大小,线程池会终止它。
newFixedThreadPool工厂:为请求的池设置了核心池的大小和最大池的大小,而且池永远不会超时。
newCachedThreadPool工厂:将最大池的大小设置为Integer.MAX_VALUE,核心池大小设置为0,超时设置为一分钟,这样创建出来的可无限扩大的线程池,会在需求量减少的情况下减少线程数量。
其他组合可以使用限时的ThreadPoolExecutor构造函数来实现。
7.3.2管理队列任务
ThreadPoolExecutor允许你提供一个BlockingQueue来持有等待执行的任务。
任务排队有3种基本方法:无限队列、有限队列、同步移交。
newFixedThreadPool和newSingleThreadExecutor默认使用的是一个无限的LinkedBlockingQueue。
一个稳妥的资源管理策略是使用有限队列,比如ArrayBlockingQueue(FIFO)、有限的LinkedBlockingQueue(FIFO)、PriorityBlockingQueue(优先级)。
当队列已满后,新的任务怎么办?
有很多的饱和策略可以处理这个问题。
对于一个有界队列,队列的长度与池的长度必须一起调节。一个大队列加一个小池,可以控制内存和CPU的使用。
对于庞大或者无限的池,可以使用SynchronousQueue,完全绕开队列,将任务直接从生产者移交给工作者线程。
SynchronousQueue:不是一个真正的队列,而是一种管理直接在线程间移交信息的机制,为了把一个元素放入到SynchronousQueue中,必须有另一个线程正在等待移交的任务。如果没有这样一个线程,只要当前池的大小还小于最大值,ThreadPoolExecutor就会创建一个新的线程。否者根据饱和策略,任务会被拒绝。
只有当池是无限的,或者可以接受任务被拒绝,SynchronousQueue才是一个有实际价值的选择。
newCachedThreadPool工厂就使用了SynchronousQueue。
只有当任务彼此独立时,有限线程池或者有限工作队列的使用才是合理的。
倘若任务之间相互依赖,有限的线程池或队列就有可能引起线程饥饿死锁。使用一个无限的池配置可以避免这类问题,就像newCachedThreadPool。
7.3.3饱和策略
当一个有限队列充满后,饱和策略开始起作用。
ThreadPoolExecutor的饱和策略可以通过调用**setRejectedExecutionHandler**来修改。(如果任务**提交到一个已经被关闭的Executor时**,**也会用到饱和策略**)
JDK提供了几种不同的RejectedExecutionHandler实现:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy。
AbortPolicy:
**中止策略**。**execute抛出未检查的RejectedExecutionException**,调用者可以捕获这个异常,然后编写能满足自己需求的处理代码。
DiscardPolicy:
**遗弃策略。默认放弃这个任务.**
DiscardOldPolicy:
遗弃最旧策略。策略选择丢弃的任务,是本应该接下来就执行的任务,该策略还会尝试去重新提交新任务。(如果工作队列是优先级队列,那么“遗弃最旧的”丢弃的刚好是优先级最高的元素)
CallerRunsPolicy:
**调用者运行策略**。既不丢弃哪个任务,也不会抛出任何异常,它会**把一些任务推回到调用者那里**,以此**减缓新任务流**。它**不会在线程池中执行最新提交的任务**,但是会在一个**调用了execute的线程中执行**。
CallerRUnsPolicy理解:
如果是一个主线程调用了execute,那么当所有的池线程都被占用,而且工作队列已充满后,下一个任务会在主线程中执行,主线程调用execute执行这个任务,因为这将花费一些时间,所以主线程在一段时间内不能提交任何任务,给了工作者线程时间来追赶进度。
示例
//创建一个可变长的线程池,使用受限队列和“调用者运行”饱和策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(
N_THREADS,
N_THREADS,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(CAPACITY),
executor.setRejectedExecutionHanlder(new ThreadPoolExecutor.CallerRunsPolicy()));
如果没有预置的饱和策略来阻塞execute,使用Semaphore也可以实现这个效果。
Semaphore会限制任务的注入率,设置Semaphore的限制范围等于在池的大小上加上你希望允许的可以排队的任务数量。
示例:
public class BoundedExecutor{
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound){
this.exec = exec;
this semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command){
semaphore.acquire();
try{
exec.execute(new Runnnable() {
public void run(){
try{
command.run();
}finally{
semaphore.release();
}
}
});
}catch(RejectedExecutionException e){
semaphore.release();
}
}
}
7.3.4线程工厂
无论如何,线程池需要创建一个线程,都要通过一个线程工厂。
默认的线程工厂创建一个新的、非后台的线程,并没有特殊的配置。
详细指明一个线程工厂,能允许你定制池线程的配置信息。ThreadFactory只有唯一的方法:newThread,它会在线程池需要创建一个新线程时调用。
源码:
//ThreadFactory接口
public interface ThreadFactory{
Thread newThraed(Runnable r)
}
示例:
//定制的线程工厂
public class MyThreadFactory implements ThreadFactory{
private final String poolName;
public MyThreadFactory(String poolName){
this.poolName = poolName;
}
public Thread newThread(Runnable runnable){
return new MyAppThread(runnable, poolName);
}
}
//自定义的线程基类
public calss MyAppThread extends Thread{
public static final String DEFAULT_NAME = "MyAppThread";
private static volatile boolean debugLifecycle = false;
private static final AtomicInteger cteated = new AtomicInteger();
private static final AtomicInteger alive = new AtomicInteger();
private static final Logger log = Logger.getAnonymousLogger();
public MyAppThread(Runnable r){this(r, DEFAULT_NAME);}
public MyAppThread(Runnabel runnable, String name){
super(runnable, name + "-" + created.incrementAndGet());
setUncaugthExcptionHandler(
new Thread.UncaughtExceptionHandler(){
public void uncaughtException(Thread t, Throwable e){
log.log(Level.SEVERE, "UNCAUGHT in Thread" + t.getName(), e);
}
});
}
public void run(){
boolean debug = debugLifecycle;
if(debug) log.log(Level.FINE, "Created" + getName());
try{
alive.incrementAndGet();
super.run();
}finally{
alive.decrementAndGet();
if(debug) log.log(Level.FINE, "Exiting" +getName());
}
}
public static int getThreadsCreated(){return created.get();}
public static int getThreadsAlive(){return alive.get();}
public static boolean getDebug(){return debuLifecycle;}
public static void setDebug(boolean b){debugLifecycle = b;}
}
7.3.5构造后再定制ThreadPoolExecutor
大多数通过构造函数传递给Thread’PoolExecutor的参数(比如核心池大小、最大池大小、存活时间、线程工厂、拒绝处理器),都可以在创建后通过setters进行修改。
如果Executor是通过Executors中的某一个工厂方法(除newSingleThreadExecutor以外)创建的,首先把结果转型为ThreadPoolExecutor,然后访问setters方法。
Executors 中包括一个名为unconfigurableExecutorService的工厂方法,它返回一个现有的ExecutorService,并对它进行包装,它只暴露出ExecutorService的方法,因此不能进行进一步的配置。
newSingleThreadExecutor就是通过这样的方式返回一个封装过的ExecutorService。因为它并不承若并发地执行任务,如果一些具有误导性的代码试图增加一个单线程化executor的池大小,这会破坏语气性。
如果你不希望执行策略被修改,也可以使用unconfigurableExecutorService来包装自己的Executor。
7.4扩展ThreadPoolExecutor
ThreadPoolExecutor的设计是可扩展的,它提供了几个“钩子”让子类去覆写:beforeExecute、afterExecute、terminate。
执行任务的线程会调用钩子函数beforeExecute、afterExecute,用它们添加日志、时序、监视器、统计信息收集的功能。
如果beforeExecute抛出一个RuntimeException,任务将不被执行,afterExecute也不被执行。
无论任务正常从run返回,还是抛出一个异常,afterExecute都被调用。
如果任务完成后抛出一个Error,则afterExecute不会被调用。
terminated钩子会在线程池完成关闭动作后调用。(也就是所有任务都已经完成并且所有工作者线程也已经关闭后)。terminated可以用来释放Executor在生命周期里分配到的资源。
7.4.1示例:给线程池加入统计信息
public class TimingThreadPool extends ThreadPoolExecutor{
private final ThreadLoacl<Long> startTime = new ThreadLocal<Long>();
private final Logger log = Logger.getLogger("TimingThreadPool");
//记录任务的数量
priavte final AtomicLong numTasks = new AtomicLong();
//记录任务花费的总时长
priavte final AtomicLong totalTime = new AtomicLong();
//打印日志,记录开始时间
protected void beforeExecute(Thread t, Runnable r){
super.beforeExecute(t,r);
log.fine(String.format("Thread %s: start %s", t, r));
startTime.set(System.nanoTime());
}
//记录日志总数和任务花费总时长,打印日志告诉用户当前任务花费时长
protected void afterExecute(Runnable r, Throwable t){
try{
long endTime = System.nanoTime();
long taskTime = endTime - startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(taskTime);
log.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime));
}finally}{
super.afterExecute(r, t);
}
}
//打印日志告诉用户每个任务所花费的平均时长。
protected void terminated(){
try{
log.info(String.format("Terminated: avg time=%dns",totalTime.get() / numTasks.get()));
}finally{
super.terminated();
}
}
}
7.5并行递归算法
如果一个循环的每次迭代都是独立的,并且我们不必等待所有的迭代都完成后再一起处理,那么我们可以使用Executor把一个顺序的循环转化为并行的循环。
如果你需要提交一个任务集并等待它们完成,那么可以使用ExecutorService.invokeAll。当所有结果都可用后,可以使用CompletionService来获取结果。
示例:
//转换前:顺序执行
void processSequentially(List<Element> elements){
for(Element e : elements){
process(e);
}
}
//转换后:并行执行
void processInParailel(Executor exec, List<Element> elements){
for(final Element e : elements){
exec.execute(new Runnable(){
public void run(){process(e);}
});
}
}
//当每个迭代彼此独立,并且完成顺序体中每个迭代的工作,意义都足够重大,足以弥补管理一个新任务的开销时,这个顺序循环是适合并行化的。
递归算法的内部通常会存在顺序循环:
//普通深度优先遍历:在每个节点上执行计算,把结果放入一个容器中
public<T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results){
for(Node<T> n : nodes){
results.add(n.compute());
sequentialRecursive(n.getChildren(), results);
}
}
//并行化深度优先遍历:不是在访问节点时进行计算,而是为每个节点都提交了一个任务来完成计算
public<T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results){
for(final Node<T> n : nodes){
exec.execute(new Runnable(){
public void run(){
results.add(n.compute());
}
});
parallelRecursive(exec, n.getChildren(), results);
}
}
//等待并行运算的结果
public<T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException{
ExecutorService exec = Executors.newCachedThreadPool();
Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
parallelRecursive(exec, nodes, resultQueue);
exec.shutdown();
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
return resultQueue;
}
7.5.1示例:谜题框架
public interface Puzzle<P,M>{
//P 代表了位置,M道标了移动类
P initialPosition();
boolean isGoal(P position);
Set<M> legalMoves(P position);
P move(P position,M move);
}
static class Node<P,M>{
final P pos;
final M move;
final Node<P,M> prev;
Node(P pos,M move, Node<P,M> prev){...}
List<M> asMoveList(){
List<M> soultion = new LinkedList<M>;
for(Node<P,M> n = this; n.move != null; n = n.prev){
solution.add(0, n.move);
}
return solution;
}
}
//顺序化的谜题解决者
public class SequentialPuzzleSolver<P,M>{
private final Puzzle<P,M> puzzle;
private final Set<P> seen = new HashSet<P>;
public SequentialPuzzleSolver(Puzzle<P,M> puzzle){
this.puzzle = puzzle;
}
public List<M> solve(){
P pos = puzzle.initialPosition();
return search(new Node<P,M>(pos,null,null));
}
private List<M> search(Node<P,M> node){
if(!seen.contains(node.pos)){
seen.add(node.pos);
if(puzzle.isGoal(node.pos)){
return node.asMoveList();
}
for(M move : puzzle.legalMoves(node.pos)){
P pos = puzzle.move(nodes.pos, move);
Node<P, M> child = new Node<P, M>(pos, move, node);
List<M> result = search(child);
if(result != null){
return result;
}
}
}
return null;
}
}
//并发版的谜题解决者
public class SequentialPuzzleSolver<P,M>{
private final Puzzle<P,M> puzzle;
private final ExecutorService exec;
private final ConcurrentMap<P,Boolean> seen;
//用于存储方案,保证其可见性
final ValueLatch<Node<P,M>> solution = new ValueLatch<Node<P,M>>();
public List<M> solve() throws InterruptedException{
try{
P p = puzzle.initialPosition();
//并发执行寻找解决方案
exec.execute(newTask(p,null,null));
//阻塞,直到发现一个方案
Node<P,M> solnNode = solution.getValue();
return (solnNode == null) ? null : solnNode.asMoveList();
}finally{
exec.shutdown();
}
}
protected Runnable newTask(P p,M m,Node<P,M> n){
return new SolverTask(p, m, n);
}
class SolverTask extends Node<P, M> implements Runnable {
...
public void run(){
if(solution.isSet() || seen.putIfAbsent(pos, true) != null) return;//已找到一个解决方案,或者该位置曾经到达过
if(puzzle.isGoal(pos)){
solution.setValue(this);
}else{
for (M m : puzzle.legalMoves(pos)){
exec.execute(newTask(puzzle.move(pos,m),m,this));
}
}
}
}
}
//能够感知任务不存在的解决者
public class PuzzleSolver<P,M> extends ConcurrentPuzzleSolver<P,M>{
....
//用于感知任务是否存在的标志
private final AtomicInteger taskCount = new AtomicInteger(0);
protected Runnabl newTask(P p,M m,Node<P,M> n){
return new CountingSolverTask(p,m,n);
}
class CountingSolverTask extends SolverTask{
CountingSolverTask(P pos, M move, Node<P,M> prev){
super(pos,move,prev);
//每实例化一次,加1
taskCount.incrementAndGet();
}
public void run(){
try{
super.run();
}finally{
//当计数器减为0时,就将结果方案设置为null
if(taskCount.decrementAndGet() == 0){
solution.setValue(null);
}
}
}
}
}
总结
对于并发执行的任务,Executor框架是强大且灵活的。
Executor提供了大量可调节的选项:
创建和关闭线程的策略,
处理队列任务的策略
处理过剩任务的策略
并且提供了几个钩子函数用于扩展它的行为。
一些类型的任务需要特定的执行策略,而一些调节参数组合在一起后可能产生以外的结果。
