在线程中执行任务

串行执行

在应用程序中可以通过多种策略来调度任务,其中一些策略能更好的利用潜在的并发性。最简单的策略就是在单个线程中串行的执行各项任务,但是串行处理机制通常都无法提供高吞吐率或快速响应性。

显式的为任务创建线程

在正常负载情况下,为每个任务分配一个线程的方法能提升串行执行的性能,只要请求的到达速率不超出服务器的请求处理能力,那么这种方法可以同时带来更快的响应性和更高的吞吐率。

无限制创建线程的不足

在生产环境中,这种方法存在一些缺陷,尤其是创建大量线程时:
线程生命周期的开销非常高:每次请求都要创建一个新的线程耗费大量的计算资源。
资源消耗:空闲的线程会占用很多内存。
稳定性:大量的创建线程会超出服务器承受的范围造成oom异常。

Executor框架

  1. public interface Executor {
  2. void execute(Runnable command);
  3. }

Executor是一个简单的接口,但它却为灵活且强大的异步任务执行框架提供了基础,该框架能支持多种不同类型的任务执行策略。它将任务的提交过程与执行过程解耦开来,Executor的实现还提供了对生命周期的支持,以及统计信息收集,应用程序管理机制和性能监视等机制。

线程池

在线程池中执行任务比为每个任务分配一个线程优势更多。可重用现有线程而不是创建新线程;当请求到达时工作线程通常已经存在,而不必等待创建线程而耗费时间,从而提高程序响应。通过适当调整线程池大小,可以创建足够多的线程以便处理器保持忙碌状态,同时还可以防止过多线程相互竞争资源而使应用程序耗尽内存或失败。
可通过Executors中的静态工厂方法来创建线程池:

newFixerThreadPool

  1. //创建一个固定长度的线程池,该线程池重用固定数量的线程并且在共享的无界队列中运行。
  2. //在任何时候,最多nThreads个线程将是活动的处理任务。如果在所有线程都处于活动状态时提交了其他任务,
  3. //它们将在队列中等待,直到线程可用。如果任何线程在关闭之前由于执行期间的故障而终止,新的线程将取代它。
  4. public static ExecutorService newFixedThreadPool(int nThreads) {
  5. return new ThreadPoolExecutor(nThreads, nThreads,
  6. 0L, TimeUnit.MILLISECONDS,
  7. new LinkedBlockingQueue<Runnable>());
  8. }

newSingleThreadExecutor

  1. //创建只有一个线程的线程池来执行任务,如果这个线程异常结束,会创建新的线程来替代
  2. //能保证按照任务在队列中的顺序来串行执行(FIFO,LIFO,优先级)
  3. public static ExecutorService newSingleThreadExecutor() {
  4. return new FinalizableDelegatedExecutorService
  5. (new ThreadPoolExecutor(1, 1,
  6. 0L, TimeUnit.MILLISECONDS,
  7. new LinkedBlockingQueue<Runnable>()));
  8. }

newScheduledThreadPool

  1. //创建固定长度的线程池,而且以延迟或者定时的方式执行任务
  2. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  3. return new ScheduledThreadPoolExecutor(corePoolSize);
  4. }

newCachedThreadPool

  1. //可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,当需求增加时,
  2. //可以添加新的线程,线程池的规模不存在任何限制。
  3. public static ExecutorService newCachedThreadPool() {
  4. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  5. 60L, TimeUnit.SECONDS,
  6. new SynchronousQueue<Runnable>());
  7. }

等等。。。

Executor的生命周期

Executor的实现通常会创建线程来执行任务,但jvm只有在所有(非守护)线程全部终止后才会退出。因此,如果无法正确地关闭Executor,那么jvm将无法结束。
为了解决执行服务的生命周期问题,Executor扩展了ExecutorService接口,添加了一些用于生命周期管理的方法,同时还有一些用于任务提交的方法。

  1. public interface ExecutorService extends Executor {
  2. //终止任务
  3. void shutdown();
  4. //暴力终止任务
  5. List<Runnable> shutdownNow();
  6. //任务是否终止
  7. boolean isShutdown();
  8. //如果关闭后所有任务都已完成,则返回true
  9. boolean isTerminated();
  10. boolean awaitTermination(long timeout, TimeUnit unit)
  11. throws InterruptedException;
  12. //提交一个带有返回结果的任务
  13. <T> Future<T> submit(Callable<T> task);
  14. <T> Future<T> submit(Runnable task, T result);
  15. Future<?> submit(Runnable task);
  16. //执行一组任务
  17. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  18. throws InterruptedException;
  19. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  20. long timeout, TimeUnit unit)
  21. throws InterruptedException;
  22. <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  23. throws InterruptedException, ExecutionException;
  24. <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  25. long timeout, TimeUnit unit)
  26. throws InterruptedException, ExecutionException, TimeoutException;
  27. }

Callable与Future

Callable与Future能返回携带结果的任务,许多任务实际上是存在延迟的计算,比如执行数据库查询,从网络上获取资源,或者计算某个复杂的功能。对于这些任务,Callable是一种更好的抽象,它认为主入口点将返回一个值,并可能抛出一个异常。
Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。

  1. public interface Callable<V> {
  2. V call() throws Exception;
  3. }
  1. public interface Future<V> {
  2. //尝试取消任务
  3. boolean cancel(boolean mayInterruptIfRunning);
  4. //如果此任务在正常完成之前取消,则返回true
  5. boolean isCancelled();
  6. //如果此任务完成,则返回true,完成可能是由于正常终止,例外或取消。
  7. boolean isDone();
  8. //等待任务返回结果
  9. V get() throws InterruptedException, ExecutionException;
  10. //最多等待多长时间返回结果
  11. V get(long timeout, TimeUnit unit)
  12. throws InterruptedException, ExecutionException, TimeoutException;
  13. }