任务执行

大多数并发应用程序都是围绕任务进行管理的。

任务:抽象、离散的工作单元

5.1在线程中执行任务

围绕执行任务来管理应用程序时:

  • 清晰任务边界(理想:任务的独立的活动)

应用程序应该在负荷过载时平缓地劣化,而不应该负载一高就简单地以失败告终。

大多数服务器应用程序都选择了下面这个自然的任务边界:单独的客户请求

5.1.1顺序地执行任务

  1. //接受达到80端口的Http请求。
  2. public class SingleThreadWebServer {
  3. public static void main(String[] args) throws IOException {
  4. ServerSocket serverSocket = new ServerSocket(80);
  5. while (true){
  6. Socket accept = serverSocket.accept();
  7. //处理请求的细节
  8. handleRequest(accept);
  9. }
  10. }
  11. }

顺序化处理几乎不能为服务器应用程序提供良好的吞吐量或快速的响应性。

也有特例:

任务的数量很少但生命周期很长

服务器只服务于唯一的用户

大多数服务器应用程序不是以顺序化执行这种方式工作的

5.1.2显示地为任务创建线程

  1. public class SingleThreadWebServer {
  2. public static void main(String[] args) throws IOException {
  3. ServerSocket serverSocket = new ServerSocket(80);
  4. while (true) {
  5. final Socket accept = serverSocket.accept();
  6. Runnable runnable = () -> {handleRequest(accept);};
  7. new Thread(runnable).start();
  8. }
  9. }
  10. }

主线程为每个连接都创建一个新线程处理请求,而不是在主循环内部处理它:

  • 执行任务的负载已经脱离了主线程,这让主线程能更快地重新开始下一个连接。提高了响应性。
  • 并行处理任务,多个请求可以同时得到服务。程序的吞吐量得到了提高。
  • 任务处理代码必须是线程安全的,因为有多个任务会并发地调用它。

中等强度的负载水平下,“每任务每线程”是对顺序化执行的良好改进。

5.1.3无限制创建线程的缺点

  • 线程生命周期的开销

    • 线程的创建和关闭不是“免费”的。
  • 资源消耗量

    • 活动线程会消耗系统资源,尤其是内存。如果可运行的线程数多于可用的处理器数,线程将会空闲,大量的空闲线程会占用更多的内存,给垃圾回收器带来压力。
  • 稳定性

    • 应该限制可创建线程的数目,限制的数目依不同平台而定,同时也受到JVM的启动参数、Thread的构造函数中请求的栈大小等因素的影响,以及底层操作系统的限制。

应该设置一个范围来限制你的应用程序可以创建的线程数,然后彻底地测试你的应用程序

对于一个服务器,我们希望它具有高可用性,而且在高负载下可以平缓地劣化。

5.2Executor框架

任务是逻辑上的工作单元,线程是使任务异步执行的机制。

有界队列可以防止程序过载而耗尽内存,线程池为线程管理提供了同样的好处。

作为Executor框架的一部分,java.util.concurrent 提供了一个灵活的线程池实现。

  1. //Executor接口
  2. public interface Executor{
  3. void execute(Runnable command);
  4. }

Executor基于生产者-消费者模式

提交任务的执行者是生产者(产生待完成的工作单元)

执行任务的线程是消费者(消耗掉这些工作单元)

如果你要实现一个生产者-消费者的设计,使用Executor通常是最简单的方式。

5.2.1示例:使用Executor实现Web Server

  1. //只要替换一个不同的Executor实现,就可以改变服务器的行为。
  2. //但是用于提交任务的代码会不断地扩散到整个程序,难以集中管理。
  3. public class TaskExecutionWebServer {
  4. private static final int NTHREADS = 100;
  5. private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);
  6. public static void main(String[] args) throws IOException {
  7. ServerSocket serverSocket = new ServerSocket(80);
  8. while (true){
  9. final Socket connection = serverSocket.accept();
  10. Runnable task = () ->{handleRequest(connection);};
  11. exec.execute(task);
  12. }
  13. }
  14. }
  15. //为每一个请求都创建一个新线程。
  16. public class ThreadPerTaskExecutor implements Executor{
  17. public void execute(Runnable r){
  18. new Thread(r).start();
  19. }
  20. }

5.2.2执行策略

what、where、when、how

  • 任务在什么(what)线程中执行?
  • 任务以什么(what)顺序执行(FIFO、LIFO、优先级)
  • 可以有多少个(how many)任务并发执行?
  • 可以有多少个(how many)任务进入等待执行队列?
  • 如果系统过载,需要放弃一个任务,应该挑选哪一个(which)任务?另外,如何(how)通知应用程序知道这一切呢?
  • 在一个任务的执行前与结束后,应该做什么(what)处理?
  1. 将任务的提交与任务的执行策略规则进行分离。
  2. 无论何时当你看到这种形式的代码:
  3. new Thread(runnable).start();
  4. 并且你希望获得一个更加灵活的执行策略时,请考虑使用Executor代替Thread

5.2.3线程池

  1. 线程池管理一个工作者线程的同构池。**线程池是与工作队列紧密绑定的**。
  2. **工作队列**:持有所有等待执行的任务。
  3. **工作者线程**:从工作队列中获取下一个任务,执行它,然后回来继续等待另一个线程。

线程池优点:

  • 重用存在的线程,而不是创建新的线程,这可以在处理多请求时抵消线程创建、消亡产生的开销。
  • 在请求到达时,工作者线程通常已经存在,用于创建线程的等待时间并不会延任务的执行,提高了响应性。
  • 通过适当地调整线程池的大小,可以防止过多的线程相互竞争资源,导致应用消耗内存。

可以通过调用Executors中的某个静态工厂方法来创建一个线程池:

  • newFixedThreadPool:创建一个定长的线程池每提交一个任务就创建一个线程,直到达到池的最大长度,之后线程池会保持长度不再变化。
  • newCacheThreadPool:创建一个可缓存的线程池。如果当前线程池的长度超过了处理的需求时,它可以回收空闲的线程。当需求增加时,它可以灵活的添加新的线程
  • newSingleThreadExecutor:创建一个单线程化的executor,它只创建唯一的工作者线程来执行任务,如果这个线程异常结束,会有另一个取代它。它可以保证任务依照顺序执行
  • newScheduledThreadPool:创建一个定长的线程池,而且支持定时的以及周期性的任务执行

由于服务器没有创建数以千计的线程去争夺CPU和内存资源,所以服务器仍然会更加平缓地劣化。

5.2.4Executor的生命周期

  1. JVM会在所有线程全部终止后才退出。因此,如果无法正确关闭Executor,将会阻止JVM的结束。
  2. 为了解决执行服务的生命周期问题。ExecutorService接口扩展了Executor,并添加了一些用于生命周期管理的方法。
  1. public interface ExecutorService extends Executor{
  2. void shutdown();
  3. List<Runnable> shutdownNow();//启动一个强制关闭过程,尝试取消所有运行中的任务和排在队列中尚未开始的任务
  4. boolean isShutdown();
  5. boolean isTerminated();
  6. //判断ExecutorService是否已经终止。
  7. boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedExecption;
  8. //等待ExecutorService到达终止状态
  9. //.....其他用于任务提交的便利方法
  10. }

ExecutorService暗示了生命周期有3种状态:运行、关闭、终止。

关闭后提交到ExecutorService中的任务,会被拒绝执行处理器(ThreadPoolExecutor提供的,ExecutorService接口中的方法并不提供此功能)

拒绝执行处理其可能只是简单地放弃任务,也可能会引起execute抛出一个未检查的RejectedExecutionException。

一旦所有任务全部完成后,会进入终止状态

示例

  1. public class LifecycleWebServer {
  2. private final ExecutorService exec = ....;
  3. public void start() throws IOException{
  4. ServerSocket serverSocket = new ServerSocket(80);
  5. while (!exec.isShutdown()){
  6. try {
  7. final Socket conn = serverSocket.accept();
  8. exec.execute(() -> {handleRequest(conn);});
  9. }catch (RejectedExecutionException e){
  10. if (!exec.isShutdown()){
  11. log("...",e);
  12. }
  13. }
  14. }
  15. }
  16. public void stop(){exec.shutdown();}
  17. void handleRequest(Socket connection){
  18. Request req = readRequest(connection);
  19. if (isShutdownRequest(req)){
  20. stop();
  21. }else {
  22. dispatchRequest(req);
  23. }
  24. }
  25. }

5.2.5延迟的、并具周期性的任务

Timer工具管理任务的延迟执行,但是Timer存在一些缺陷,因此可以使用ScheduledThreadPoolExecutor 作为替代品。

如果你需要构建自己的调度服务,仍然可以使用类库提供的DelayQueue,它是BlockingQueue的实现,为SecheduledThreadPoolExecutor提供了调度功能。

DelayQueue管理着一个包含Delayed对象的容器每个Delayed对象都与一个延迟时间相关联:只有在元素过期后,DelayQueue才让你能执行take操作获取元素

从DelayQueue中返回的对象将依据它们所延迟的时间进行排序

5.3寻找可强化的并行性

大多数服务器应用程序中,都存在这样一个明显的任务边界:单一的客户请求。

但是,合理的任务又是并非如此显而易见。即时就是服务器应用程序一个单一的客户请求内部仍然会有可以进一步细化的并行性。

5.3.1可携带结果的任务:Callable和Future

Callable:可以返回值,并为可能抛出的异常预先做好了准备。

Executors包含了一些工具方法,可以把其他类型的任务封装成一个Callable。比如Runnable和java.security.PrivilegedAction。

Executor执行的任务周期有4个阶段创建、提交、开始、完成

总可以取消已经提交但尚未开始的任务。对于已经开始的任务,只有它们响应中断,才可以取消。已完成的任务取消没有任何影响。

Future描述了任务的生命周期,并提供了相关的方法来获得任务的结果、取消任务以及检验任务是否已经完成还是被取消

源码

public interface Callable<V> {
    V call() throws Exception;
}
//可以使用Callable<Void>表示无返回值的任务。

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
//如果任务已经完成,get会立即返回或者抛出一个Exception。
//如果任务没有完成,get会阻塞直到它完成。
//如果任务抛出异常,get会将该异常封装为ExecutionException,然后重新抛出。可以使用getCause重新获取被封装的原始异常。
//如果任务被取消,get会抛出CancellationException。

创建一个描述任务的Future

ExecutorService中**所有的submit方法都返回一个Future**。因此你可以**将一个Runnable或者一个Callable提交给executor**,然后得到一个Future。

也可以显示地为给定的Runnable或Callable**实例化一个FutureTask**。

将Runnable或Callable提交到Executor:是为了建立一个安全发布。

同理,设置Future结果值的行为:也是为了建立一个安全发布。

5.3.2并行运行异类任务的局限性

对于试图并行执行连续的异类任务,以获得性能重大提升的方法,需谨慎考虑。

如果没有在相似的任务之间发现更好的并行性,那么并行方法应有的好处会逐渐减少。

示例:

比如你为两个工作者划分了两个任务A、B,但是A执行花费时间是B的10倍,那么整个过程仅仅加速了9%而已。(还没有算任务协调上的开销)
大量相互独立且同类的任务进行并行处理,会将程序的任务量分配到不同的任务中,这样才能真正获得性能的提升。

5.3.3CompletionService:当Executor遇见BlockingQueue

CompletionService整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take和poll方法,在结果完整时可以获得这个结果

ExecutorCompletionService是实现CompletionService接口的一个类,并将计算结果委托给一个Executor。

ExecutorCompletionService的实现

它在构造函数中创建一个BlockingQueue,用它去保存完成的结果。计算完成时会调用FutureTask中的done方法,当提交了一个任务后,首先把这个任务包装成一个QueueingFuture,它是FutureTask的一个子类,然后覆写done方法,将结果置入BlockingQueue中。
ExecutorService executor;
CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageData>(executor);

completionService.submit(new Callable<imageData>() {
    public ImageData call(){
        return imageInfo.downloadImage();
    }
});


Future<ImageData> f = completionService.take();

5.3.4为任务设置时限

有时候如果一个活动无法在某个确定时间内完成,那么它的结果就失效了,此时程序可以放弃该活动。

在预定时间内执行任务的主要挑战是:

  • 确保在得到答案,或者发现无法从任务中获取结果的这一过程中所花费的时间,不会比预定的时间长。

    • Future.get的限时版本:在结果准备好后立即返回,如果在时限内没有准备好,就会抛出TimeoutException。
  • 当任务超时后应该能停止它们,这样才不会为计算一个无用的结果而浪费计算资源。

    • 如果一个限时的Future.get抛出TimeoutException,可以通过Future取消任务。
Future<A> f = ......;
long time;
A a;
try{
   a = f,get(time, NANOSECONDS);
}catch(ExecutionException e){

}catch(TimeoutException e){
    f.cancel(true);
}

如果要提交多任务:

可以使用限时版本的invokeAll,将多个任务提交到一个ExecutorService,并获得其结果。

private class QuoteTask implements Callable<TravelQuote>{
    .......
    public TravelQuote call() throws Exception{
        ......;
    }
}

public List<TravelQueote> getRankedTravelQuotes(long time,TimeUnit unit){
    .....
    List<QuoteTask> tasks = new ArrayList<QuoteTask>();
    List<Future<TravelQuote>>exec.invokeAll(tasks,time,unit);
}

总结

  • 围绕任务的执行来构造应用程序,可以简化开发,便于同步。
  • Executor框架有助于将任务的提交与执行策略之间进行解耦,同时还支持很多不同类型的执行策略。
  • 清晰每一个任务的边界,把应用程序分解为不同的任务。