Java 多线程 - 结构化并发编程

一. 任务执行

1. 在线程中执行任务

任务通常是一些抽象的且离散的工作单元。通过把应用程序的工作分解到多个任务中,可以简化程序的组织结构。

  • 串行执行任务
  1. 按照顺寻执行任务
  • 显示为任务创建线程
  1. 1) 任务处理线程从主线程分离出来,使得主线程不用等待任务完毕就可以去快速地去响应下一个请求,以达到高响应速度;
  2. 2) 任务处理可以并行,支持同时处理多个请求;
  3. 3) 任务处理是线程安全的,因为每个任务都是独立的。
  • 无限制创建线程的不足
  1. 1) 线程的生命周期的开销很大:每创建一个线程都是要消耗大量的计算资源;
  2. 2) 资源的消耗:活跃的线程要消耗内存资源,如果有太多的空闲资源就会使得很多内存资源浪费,导致内存资源不足,多线程并发时就会出现资源强占的问题;
  3. 3) 稳定性:可创建线程的个数是有限制的,过多的线程数会造成内存溢出;

2. Executor 框架

  1. 任务是一组逻辑工作单元,而线程则是任务异步执行的机制。为了让任务更好地分配到线程中执行,java.util.concurrent提供了Executor框架。
  2. Executor 基于生产者-消费者模式:
  3. 提交任务的操作相当于生产者(生成待完成的工作单元),执行任务的线程则相当于消费者(执行完这些工作单元)。
  4. 通过使用 Executor,将请求处理任务的提交与任务的实际执行解耦开来。
  • 执行策略
  1. 1) 通过将任务的提交与执行解耦,再执行策略中定义了任务执行的 "What, Where, When, How":
  2. a) 在什么(What) 线程中执行任务
  3. b) 任务按照什么(What) 顺序执行(FIFOLIFO、优先级)
  4. c) 有多少个(How Many) 任务能并发执行
  5. d) 如果系统过载而需要拒绝一个任务,那么应该选择哪一个(whick) 任务? 另外, 如果(How) 通知应用程序有任务被拒绝
  6. c) 在执行一个任务之前或之后, 应该进行哪些(What) 动作?
  7. 2) new Thread(runnable).start()
  8. 请用更灵活的 Executor 替代 Thread
  • 线程池
  1. 1) 线程池,是指管理一组同构工作线程的资源池
  2. a) 在线程池中执行任务比「为每一个任务分配一个线程」优势更多。通过重用现有的线程而不是创建新线程,可以在处理多个请求时分摊在线程创建和销毁过程中产生的巨大开销。
  3. b) 另外一个额外的好处是,当请求到达时,工作线程通常已经存在,因此不会由于等待创建线程而延迟任务的执行,从而提高了响应性
  4. 2) Executors中的静态工厂方法提供了一些线程池:
  5.  a) newFixedThreadPool:固定长度的线程池
  6.  b) newCachedThreadPool:可缓存的线程池,线程池的规模不存在限制
  7.  c) newSingleThreadExecutor:单线程的线程池, 创建单个工作线程执行任务, 线程异常结束, 会创建另外一个线程代替. 此线程可以确保任务按照顺序串行执行(FIFOLIFO、优先级)
  8.  d) newScheduledThreadPool:固定长度的线程池,且以延迟或定时的方式来执行任务
  • Executor 的生命周期
  1. 1) ExecutorService 3 种状态
  2. 运行、关闭、终止
  3. 2) ExecutorService提供了两种方法关闭方法:
  4. a) shutdown: 平缓的关闭过程,即不再接受新的任务,等到已提交的任务执行完毕后关闭进程池
  5. b) shutdownNow: 立刻关闭所有任务,无论是否再执行
  • 延迟任务与周期任务
  1. 1) 建议使用 ScheduledThreadPoolExecutor 来代替 Timer
  2. 2) Java 中提供 Timer 来执行延时任务和周期任务,但是 Timer 类有以下的缺陷:
  3.  a) Timer 只会创建一个线程来执行任务,如果有一个 TimerTask 执行时间太长,就会影响到其他 TimerTask 的定时精度;
  4.  b) Timer 不会捕捉 TimerTask 未定义的异常,所以当有异常抛出到 Timer 中时,Timer 就会崩溃,而且也无法恢复,就会影响到已经被调度但是没有执行的任务,造成“线程泄露”。

3. 可利用的并行性

  1. 1) Executor Runnable 的形式描述任务,但是 Runnable 有很大的局限性:
  2.  没有返回值,只是执行任务;
  3.  不能处理被抛出的异常;
  4. 2) 为了弥补以上的问题,Java 中设计了另一种接口 Callable
  5. 3) Executor 框架下所执行的任务都有四种生命周期:
  6.  创建、提交、开始、完成;
  7. 对于一个已提交但还没有开始的任务,是可以随时被停止;但是如果一个任务已经如果已经开始执行,就必须等到其相应中断时再取消;当然,对于一个已经执行完成的任务,对其取消任务是没有任何作用的。
  • Callable
  1. Callable 支持任务有返回值,并支持异常的抛出。如果希望获得子线程的执行结果,那 Callable 将比 Runnable 更为合适。
  2. 无论是 Callable 还是 Runnable 都是对于任务的抽象描述,即表明任务的范围:有明确的起点,并且都会在一定条件下终止。
  • Future
  1. 1) Future 类表示任务生命周期状态,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等,其命名体现了任务的生命周期只能向前不能后退。
  2. 2) Future 类提供方法查询任务状态外,还提供 get 方法获得任务的返回值,但是 get 方法的行为取决于任务状态:
  3.  a) 如果任务已经完成,get 方法则会立刻返回;
  4.  b) 如果任务还在执行中,get 方法则会拥塞直到任务完成;
  5.  c) 如果任务在执行的过程中抛出异常,get 方法会将该异常封装为 ExecutionException 中, 并可以通过 getCase 方法获得具体异常原因;
  6. 3) 如果将一个 Callable 对象提交给 ExecutorServicesubmit 方法就会返回一个 Future 对象,通过这个 Future 对象就可以在主线程中获得该任务的状态,并获得返回值。
  7. 4) 除此之外,可以显式地把 Runnable Callable 对象封装成 FutureTask 对象,FutureTask 不光继承了 Future 接口,也继承 Runnable 接口,所以可以直接调用 run 方法执行。
  • CompletionService
  1. 1) CompletionService 可以理解为 Executor BlockingQueue 组合:当一组任务被提交后,CompletionService 按照任务完成的顺序将任务的 future 象放入队列中。
  2. 2) 使用 CompletionService 来一个一个获取完成任务的 Future 对象外,还可以调用 ExecutorSerive invokeAll() 方法。
  3. 3) invokeAll 支持限时提交一组任务(任务的集合),并获得一个 Future 数组。invokeAll 方法将按照任务集合迭代器的顺序将任务对应的 Future 对象放入数组中,这样就可以把传入的任务(Callable)和结果(Future)联系起来。当全部任务执行完毕,或者超时,再或者被中断时,invokeAll 将返回 Future 数组。
  4. 4) invokeAll 方法返回时,每个任务要么正常完成,要么被取消,即都是终止的状态了。

二. task - 取消与关闭

  1. Java 没有提供任何机制来安全地终止线程(虽然 Thread.stop 和 suspend 方法提供了这样的机制,但由于存在缺陷,因此应该避免使用
  2. 中断:一种协作机制,能够使一个线程终止另一个线程的当前工作
  3. 立即停止会使共享的数据结构处于不一致的状态,需要停止时,发出中断请求,被要求中断的线程处理完他当前的任务后会自己判断是否停下来

1. task - 任务取消

  1. 1) 有时候我们希望提前结束任务,或者因为用户取消了操作,需要终止线程, 任务取消有很多原因:
  2. a) 用户主动取消: 如点击图形界面的取消按钮等。
  3. b) 有时间限制的: 操作当超过时间限制时,需要结束任务。
  4. c) 应用程序事件: 当一个任务找到解决方案时,需要结束操作。
  5. d) 错误: 当任务执行过程中出现不可恢复的错误,需要结束任务。
  6. e) 关闭: 当一个程序被关闭时,需要做一些清理操作。
  7. 2) 示例: 设置 canceled 变量为取消标志,每次执行前检查
  8. private volatile boolean canceled;
  9. @Override
  10. public void run() {
  11. BigInteger p = BigInteger.ONE;
  12. while (!canceled){
  13. p = p.nextProbablePrime();
  14. synchronized (this) { //同步添加素数
  15. primes.add(p);
  16. }
  17. }
  18. }
  19. 注意:这是一个有问题的取消方式,若线程阻塞在add操作后,那么即使设置了取消状态,它也不会运行到检验阻塞状态的代码,因此会永远阻塞
  • task - 中断(Interrupt)
  1. 1) 对于中断操作的正确理解是
  2. a) 它并不会真正地中断一个正在运行的线程,而是发出中断请求,然后由线程在下一个合适的时刻中断自己。(这些时刻也被称为取消点)。
  3. b) waitsleepjoin 都是严格处理这些请求,当它们收到中断请求或者在开始执行时发现某个已被设置好的中断状态时,将抛出一个异常。
  4. c) 通常中断是实现取消的最合理的方式
  5. 2) 示例
  6. public class Thread {
  7. // 每个线程都有一个 boolean 类型的中断状态
  8. // 当线程中断时,这个线程的中断状态将被设置为 true
  9. private boolean interrupted;
  10. // 中断目标线程
  11. // 调用 interrupt 并不意味着立即停止目标线程, 而只是窗体请求中断信息
  12. public void interrupt() { ... }
  13. // 返回目标线程的中断状态
  14. public boolean isInterrupted() { ... }
  15. // 清除当前线程的中断状态,并返回它之前的值), 这也是清楚中断状态的唯一方法
  16. public static boolean interrupted() { ... }
  17. }
  18. 显示的检测中断 !Thread.currentThread().isInterrupted() 后推出
  19. 阻塞方法中抓到 InterruptedException 后退出
  • task - 中断策略(Interrupt strategy)
  1. 1) 最合理的中断策略, 是以线程级(Thread-Level)、服务级(Service-Level) 的取消操作
  2. 2) 当检查到中断请求时, task 并不需要放弃所有操作,因为可以等待任务处理完毕再退出
  3. 3) 由于每个线程拥有各自的中断策略,因此除非你知道中断对该线程的含义,否则就不应该中断这个线程。
  • task - 中断响应
  1. 当调用中断阻塞函数时(Thread.sleepBlockingQueue.put), 2 种策略可用于处理 InterruptedException:
  2. 1) 传递异常(throws InterruptedException), 从而使你的方法也成为可中断的阻塞方法。
  3. 2) 恢复中断状态,从而使调用栈的上层代码能够对其进行处理。(Thread.currentThread().interrupt();)
  4. 只有实现了线程中断策略的代码才可以屏蔽中断请求,常规任务中不应该屏蔽中断请求。
  • task - Future 取消任务
  1. boolean cancel(boolean mayInterruptIfRunning);
  2. 1) 如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败,返回false
  3. 2) 调用cancel时,如果调用成功,而此任务尚未启动,则此任务将永不运行
  4. 3) 如果任务已经执行,mayInterruptIfRunning 参数决定了是否向执行任务的线程发出 interrupt 操作
  5. 4) Future.get 抛出 InterruptedException TimeoutException 时, 如果不再需要 task 结果, 可以调用 Future.cancel 来取消任务
  6. 5) 示例
  7. private static ExecutorService service = Executors.newCachedThreadPool();
  8. public static void timedRun(Runnable r, long timeout, TimeUnit unit) {
  9. Future<?> task = service.submit(r);
  10. try {
  11. task.get(timeout, unit);
  12. } catch (InterruptedException | ExecutionException | TimeoutException e) {
  13. } finally {
  14. task.cancel(true);
  15. }
  16. }
  • task - 不可中断的阻塞
  1. Java 库中许多可阻塞的方法都是通过提前返回或者抛出 InterruptedException 异常来响应中断请求。但并非所有可阻塞方法都能响应中断,例如一下:
  2. 1) Java.io 包中的同步 Socket I/O
  3. 虽然 InputStream OutputStream 中的 read write 等方法都不会响应中断,但通过关闭底层的套接字,可以使得由于执行 read write 等方法而被阻塞的线程抛出一个 SocketException
  4. 2) Java.io 包中的同步 I/O
  5. a) 当中断一个正在 InterruptibleChannel 上等待的线程时,将抛出 ClosedByInterruptedException 并关闭链路(这还会使得其他在这条链路上阻塞的线程同样抛出 ClosedByInterruptException)。
  6. b) 当关闭一个 InterruptibleChannel 时,将导致所有在链路操作上阻塞的线程抛出 AsynchronousCloseException。大多数标准的 Channel 都实现了 InterruptibleChannel
  7. 3) Selector 的异步 I/O
  8. 如果一个线程在调用 Selector.select 方法(在java.nio.channels中)时阻塞了,那么调用 close wakeup 方法会使线程抛出 ClosedSelectorException 并提前返回。
  9. 4) 获取某个锁。
  10. 如果一个线程由于等待某个内置锁而被阻塞,那么将无法响应中断,因为线程认为它肯定获得锁,所以将不会理会中断请求。但是,在 Lock 类中提供了 lockInterruptibly 方法,该方法允许在等待一个锁的同时仍能响应中断。
  • task - newTaskFor 封装非标准的取消

  • 计时运行

2. ExecutorService - 停止基于线程的服务

  • ExecutorService - 停止线程服务
  1. 1) 应用程序通常会创建基于线程的服务,如线程池。这些服务的时间一般比创建它的方法更长。
  2. a) 服务退出 -> 线程需要结束无法通过抢占式的方法来停止线程,因此它们需要自行结束
  3. b) 除非拥有某个线程,否则不能对该线程进行操控。例如,中断线程或者修改线程的优先级等
  4. c) 线程池是其工作者线程的所有者,如果要中断这些线程,那么应该使用线程池
  5. d) 应用程序可以拥有服务,服务也可以拥有工作者线程,但应用程序不能拥有工作者线程,因此应用程序不能直接停止工作者线程。
  6. 2) 服务应该生命周期方法关闭它自己以及他拥有的线程
  7. a) 要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期方法
  8. b) ExecutorService 提供的 shutdown(), shutdownNow()
  9. 3) 示例: 日志服务
  10. // LogWriter就是一个基于线程的服务,但不是一个完成的服务
  11. public class LogWriter {
  12. //日志缓存
  13. private final BlockingQueue<String> queue;
  14. private final LoggerThread logger;//日志写线程
  15. private static final int CAPACITY = 1000;
  16. public LogWriter(Writer writer) {
  17. this.queue = new LinkedBlockingQueue<String>(CAPACITY);
  18. this.logger = new LoggerThread(writer);
  19. }
  20. public void start() { logger.start(); }
  21. //应用程序向日志缓存中放入要记录的日志
  22. public void log(String msg) throws InterruptedException {
  23. queue.put(msg);
  24. }
  25. //日志写入线程,这是一个多生产者,单消费者的设计
  26. private class LoggerThread extends Thread {
  27. private final PrintWriter writer;
  28. public LoggerThread(Writer writer) {
  29. this.writer = new PrintWriter(writer, true); // autoflush
  30. }
  31. public void run() {
  32. try {
  33. while (true)
  34. writer.println(queue.take());
  35. } catch(InterruptedException ignored) {
  36. } finally {
  37. writer.close();
  38. }
  39. }
  40. }
  41. }
  42. // 可以中断阻塞的take()方法停止日志线程(消费者线程),但生产者没有专门的线程,没办法取消
  43. //日志服务,提供记录日志的服务,并有管理服务生命周期的相关方法
  44. public class LogService {
  45. private final BlockingQueue<String> queue;
  46. private final LoggerThread loggerThread;// 日志写线程
  47. private final PrintWriter writer;
  48. private boolean isShutdown;// 服务关闭标示
  49. // 队列中的日志消息存储数量。我们不是可以通过queue.size()来获取吗?
  50. // 为什么还需要这个?请看后面
  51. private int reservations;
  52. public LogService(Writer writer) {
  53. this.queue = new LinkedBlockingQueue<String>();
  54. this.loggerThread = new LoggerThread();
  55. this.writer = new PrintWriter(writer);
  56. }
  57. //启动日志服务
  58. public void start() {
  59. loggerThread.start();
  60. }
  61. //关闭日志服务
  62. public void stop() {
  63. synchronized (this) {
  64. // 为了线程可见性,这里一定要加上同步,当然volatile也可,但下面方法还需要原子性,所以这里就直接使用了synchronized,但不是将isShutdown定义为volatile
  65. isShutdown = true;
  66. }
  67. // 向日志线程发出中断请求
  68. loggerThread.interrupt();
  69. }
  70. // 供应用程序调用,用来向日志缓存存放要记录的日志信息
  71. public void log(String msg) throws InterruptedException {
  72. synchronized (this) {
  73. // 如果应用程序发出了服务关闭请求,则不存在接受日志,而是直接抛出异常,让应用程序知道
  74. if (isShutdown)
  75. throw new IllegalStateException("日志服务已关闭");
  76. // 由于queue是线程安全的阻塞队列,所以不需要同步(同步也可但并发效率会下降,所以将它放到了同步块外)。
  77. // 但是这里是的 操作序列是由两个操作组成的:即先判断isShutdown,再向缓存 中放入消息,如果将queue.put(msg)放在同步外,则在多线程环境中,LoggerThread中的 queue.size() == 0 将会不准确,所以又要想queue.put不同步,又要想queue.size()计算准确,所以就使用了一个变量reservations专用来记录缓存中日志条数,这样就即解决了同步queue效率低的问题,又解决了安全性问题,这真是两全其美
  78. // queue.put(msg);
  79. ++reservations;//存储量加1
  80. }
  81. queue.put(msg);
  82. }
  83. private class LoggerThread extends Thread {
  84. public void run() {
  85. try {
  86. while (true) {
  87. try {
  88. synchronized (LogService.this) {
  89. // 由于 queue 未同步,所以这里不能使用queue.size
  90. //if (isShutdown && queue.size() == 0)
  91. // 如果已关闭,且缓存中的日志信息都已写入,则退出日志线程
  92. if (isShutdown && reservations == 0)
  93. break;
  94. }
  95. String msg = queue.take();
  96. synchronized (LogService.this) {
  97. --reservations;
  98. }
  99. writer.println(msg);
  100. } catch (InterruptedException e) {
  101. }
  102. }
  103. } finally {
  104. writer.close();
  105. }
  106. }
  107. }
  108. }
  109. 注意:通过原子方式来检查关闭请求,并且有条件地递增一个计数器来“保持”提提交消息的权利
  • ExecutorService - 服务关闭
  1. 1) shutdown():启动一次顺序关闭,执行完以前提交的任务,没有执行完的任务继续执行完
  2. 2) shutdownNow():试图停止所有正在执行的任务(向它们发出interrupt操作语法,无法保证能够停止正在处理的任务线程,但是会尽力尝试),并暂停处理正在等待的任务,并返回等待执行的任务列表。
  3. 3) ExecutorService 已关闭,再向它提交任务时会抛 RejectedExecutionException 异常
  • ExecutorService - 毒丸对象(Poison Pill)
  1. 1) 另一种关闭生产者 - 消费者服务的方式就是使用 "毒丸" 对象, 是指一个放在队列上的对象: 当得到这个对象时, 立即停止.
  2. 2) 在提交“毒丸”对象之前提交的所有工作都会被处理,而生产者在提交了“毒丸”对象后,将不会再提交任何工作
  • ExecutorService - 只执行一次的服务
  1. 1) 如果某个方法需要处理一批任务,并且当所有任务都处理完成后才返回,那么可以通过一次私有的 Executor 来简化服务的生命周期管理,其中该 Executor 的生命周期是由这个方法来控制的.
  2. 2) 示例
  3. boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit)
  4. throws InterruptedException {
  5. ExecutorService exec = Executors.newCachedThreadPool();
  6. // 这里不能使用 volatile hasNewMail,因为还需要在匿名内中修改
  7. final AtomicBoolean hasNewMail = new AtomicBoolean(false);
  8. try {
  9. for (final String host : hosts)//循环检索每台主机
  10. exec.execute(new Runnable() {//执行任务
  11. public void run() {
  12. if (checkMail(host))
  13. hasNewMail.set(true);
  14. }
  15. });
  16. } finally {
  17. // 因为ExecutorService只在这个方法中服务,所以完成后即可关闭
  18. exec.shutdown();
  19. // 等待任务的完成,如果超时还未完成也会返回
  20. exec.awaitTermination(timeout, unit);
  21. }
  22. return hasNewMail.get();
  23. }
  • ExecutorService - shutdownNow 局限性
  1. // 我们无法通过常规方法来找出哪些任务已经开始但尚未结束。这意味着我们无法在关闭过程中知道正在执行的任务的状态,除非任务本身会执行某种检查
  2. public class TrackingExecutor extends AbstractExecutorService {
  3. private final ExecutorService exec;
  4. private final Set<Runnable> tasksCancelledAtShutdown =
  5. Collections.synchronizedSet(new HashSet<Runnable>());
  6. public TrackingExecutor(ExecutorService exec) {
  7. this.exec = exec;
  8. }
  9. //返回被取消的任务
  10. public List<Runnable> getCancelledTasks() {
  11. // 如果 shutdownNow 未调用或调用未完成时
  12. if (!exec.isTerminated())
  13. throw new IllegalStateException("");
  14. return new ArrayList<Runnable>(tasksCancelledAtShutdown);
  15. }
  16. public void execute(final Runnable runnable) {
  17. exec.execute(new Runnable() {
  18. public void run() {
  19. try {
  20. runnable.run();
  21. // 参考:http://blog.csdn.net/coslay/article/details/48038795
  22. // 实质上在这里会有线程安全性问题,存在着竞争条件,比如程序刚
  23. // 好运行到这里,即任务任务(run方法)刚好运行完,这时外界调用
  24. // 了shutdownNow(),这时下面finally块中的判断会有出错,明显示
  25. // 任务已执行完成,但判断给出的是被取消了。如果要想安全,就不
  26. // 应该让shutdownNow在run方法运行完成与下面判断前调用。我们要
  27. // 将runnable.run()与下面的if放在一个同步块、而且还要将
  28. // shutdownNow的调用也放同步块里并且与前面要是同一个监视器锁,
  29. // 这样好像就可以解决了,不知道对不能。书上也没有说能不能解决,
  30. // 只是说有这个问题!但反过来想,如果真的这样同步了,那又会带
  31. // 性能上的问题,因为什么所有的任务都会串形执行,这样还要
  32. // ExecutorService线程池干嘛呢?我想这就是后面作者为什么所说
  33. // 这是“不可避免的竞争条件”
  34. } finally {
  35. //如果调用了 shutdownNow 且运行的任务被中断
  36. if (isShutdown()
  37. && Thread.currentThread().isInterrupted())
  38. // 记录被取消的任务
  39. tasksCancelledAtShutdown.add(runnable);
  40. }
  41. }
  42. });
  43. }
  44. // 将ExecutorService 中的其他方法委托到exec
  45. }

3. 处理非正常的线程终止

  • 非正常线程终止
  1. 1) 导致线程提前推出的最主要原因就是 RuntimeException
  2. 2) 在一个线程中启动另一个线程,另一个线程中抛出异常,如果没有捕获它,这个异常也不会传递到父线程中
  3. 3) 任何代码都可能抛出一个 RuntimeException。每当调用另一个方法时,都要对它的行为保持怀疑,不要盲目地认为它一定会正常返回,或者一定会抛出在方法原型中声明的某个已检查异常
  4. 4) 示例
  5. // 如果任务抛出了一个运行时异常,它将允许线程终结,但是会首先通知框架:线程已经终结
  6. // 工作者线程的实现
  7. public void run() {
  8. Throwable thrown = null;
  9. try {
  10. while (!isInterrupted())
  11. runTask(getTaskFromWorkQueue());
  12. // 为了安全,捕获的所有异常
  13. } catch (Throwable e) {
  14. // 保留异常信息
  15. thrown = e;
  16. } finally {
  17. // 重新将异常抛给框架后终结工作线程
  18. threadExited(this, thrown);
  19. }
  20. }
  • 未捕获异常的线程
  1. Thread API 中提供了 UncaughtExceptionHandler,它能检测出某个线程由于未捕获的异常而终结的情况
  2. // 在运行时间较长的应用程序中,通常会为所有的未捕获异常指定同一个异常处理器,并且该处理器至少会将异常信息记录到日志中。
  3. public class UEHLogger implements Thread.UncaughtExceptionHandler {
  4. public void uncaughtException(Thread t, Throwable e) {
  5. Logger logger = Logger.getAnonymousLogger();
  6. logger.log(Level.SEVERE, "Thread terminated with exception: " + t.getName(), e);
  7. }
  8. }

4. JVM 关闭钩子

JVM 既可通过正常手段来关闭,也可强行关闭。

  1. 正常关闭:当最后一个“正常(非守护)”线程结束时、当有人调用了System.exit时、或者通过其他特定于平台的方法关闭时
  2. 强行关闭:Runtime.halt,这种强行关闭方式将无法保证是否将运行关闭钩子
  • 关闭钩子
  1. 关闭钩子是线程安全的: 访问共享数据时候必须使用同步机制, 小心避免发生死锁
  2. 1) 关闭钩子是指通过 Runnable.addShutdownHook 注册的但尚未开始的线程
  3. 2) JVM 并不能保证关闭钩子的调用顺序
  4. 3) 当所有的关闭钩子都执行结束时,如果 runFinalizersOnExittrue, 那么 JVM 将运行终结器(finalize), 然后再停止
  5. 4) JVM 并不会停止或中断任何在关闭时仍然运行的应用程序线程。当JVM最终结束时,这些线程将被强行结束。如果关闭钩子或终结器没有执行完成,那么正常关闭进程“挂起”并且JVM 必须被强行关闭。当被强行关闭时,只是关闭JVM,而不会运行关闭钩子
  6. 5) 关闭钩子应该是线程安全的
  7. 6) 关闭钩子必须尽快退出,因为它们会延迟 JVM 的结束时间
  8. // 通过注册关闭钩子,停止日志服务
  9. public void start()
  10. {
  11. Runnable.getRuntime().addShutdownHook(new Thread(){
  12. public void run()
  13. {
  14. try{LogService.this.stop();}
  15. catch(InterruptedException ignored){}
  16. }
  17. });
  18. }
  • 守护线程 - 一个线程来执行一些辅助工作,但有不希望这个线程阻碍 JVM 的关闭
  1. 1) 线程可分为两种:普通线程和守护线程。在 JVM 启动时创建的所有线程中,除了主线程以外,其他的线程都是守护线程(例如垃圾收集器以及其他执行辅助工作的线程)
  2. 2) 普通线程与守护线程之间的差异仅在于当线程退出时发生的操作。当一个线程退出时,JVM 会检查其他正在运行的线程,如果这些线程都是守护线程,那么 JVM 会正常退出操作。当 JVM 停止时,所有仍然存在的守护线程都将被抛弃——既不会执行 finally 代码块,也不会执行回卷栈,而 JVM 只是直接退出.
  3. 3) 尽可能少使用守护线程
  4. a) 因为很少有操作能够在不进行清理的情况下被安全的抛弃.
  5. b) 特别是如果守护线程进行 I/O 操作的任务, 那么是非常危险的.
  6. c) 守护线程最好执行 Java 内部的任务, 例如周期从内存的缓冲中移除过期的数据.
  7. 4) 守护线程通常不能用来替代应用程序中各个服务的生命周期
  • 终结器 - 清理文件句柄或套接字句柄等(避免使用)
  1. 1) 垃圾回收器对那些定义了 finalize 方法的对象会进行特殊处理:在回收器释放它们后,调用它们的 finalize 方法,从而确保一些持久化的资源被释放。
  2. 2) 通过使用 finally 代码块和显式的 close 方法,能够比使用终结器更好地管理资源
  3. 3) 例外:当需要管理对象时,并且该对象持有的资源是通过本地方法获得的

三. 线程池的使用

1. 在任务与执行策略之间隐形解耦

  1. 并非所有的任务都能使用所有的执行策略。有些类型的任务需要明确地指定执行策略,包括:
  2. 1) 依赖性任务:如果提交给线程池的任务需要依赖其他的任务,那么就隐含地给执行策略带来了约束,此时必须小心地维持这些执行策略以避免产生活跃性问题。
  3. 2) 使用线程封闭机制的任务:任务要求其执行所在的 Executor 是单线程的。如果将 Executor 从单线程环境改为线程池环境,那么将失去线程安全性。
  4. 3) 对响应时间敏感的任务:如果将一个运行时间较长的任务提交到单线程的 Executor 中,或者将多个运行时间较长的任务提交到一个只包含少量线程的线程池中,那么将降低由该 Executor 管理的服务的响应性。
  5. 4) 使用 ThreadLocal 的任务:只有当线程本地值的生命受限于任务的生命周期时,在线程池的线程中使用 ThreadLocal 才有意义,而在线程池中不应该使用 ThreadLocal 在任务之间传递值。
  • 线程饥饿死锁
  1. 1) 在线程中,如果任务依赖与其他任务,那么可能产生死锁。
  2. 2) 在单线程的 Executor 中,如果一个任务将另一个任务提交到同一个 Executor,并且等待这个被提交任务的结果,那么通常会引发死锁。第二个任务停留在工作队列中,并等待第一个任务完成,而第一个任务又无法完成,因为它在等待第二个任务的完成。
  3. 3) 在更大的线程池中,如果所有正在执行的任务的线程都由于等待其他仍处于工作队列的任务而阻塞,那么会发生同样的问题,这个现象被称为线程饥饿死锁(Thread Starvation Deadlock)。
  • 运行时间较长的任务
  1. 有限线程池线程可能会被执行时间长任务占用过长时间,最终导致执行时间短的任务也被拉长了“执行”时间。可以考虑限定任务等待资源的时间,而不要无限制地等待。

2. 设置线程池大小

  1. 1) 线程池的理想大小取决于被提交任务的类型以及所部署系统的特性。在代码中通常不会固定线程池的大小,而应该通过某种配置机制来提供,或者根据Runtime.availableProcessors来动态计算。
  2. 2) 在计算密集型的任务,在拥有Ncpu个处理器的系统上,当线程池的大小为Ncpu+1,通常能实现最优的利用率。
  3. 3) 对于包含I/O操作或其他阻塞操作的任务,由于线程不会一直执行,因此线程池的规模应该更大、要正确地设置线程池的大小,你必须估算出任务的等待时间与计算时间的比值,这可以通过一些分析或监控工具来获得。
  4. 4) CPU周期并不是唯一影响线程池大小的资源,还包括内存,文件句柄,套接字句柄和数据库连接等。

3. 配置 ThreadPoolExecutor

  • 线程的创建和销毁
  1. // ThreadPoolExecutor 是一个灵活的,稳定的线程池,允许进行各种定制。
  2. // 这些 Executor 是由 Executors 方法中的 newCachedThreadPool、newFixedThreadPool、newShedulerdThreadExecutor 等工厂方法返回的。
  3. public ThreadPoolExecutor(int corePoolSize,
  4. int maximumPoolSize,
  5. long keepAliveTime,
  6. TimeUnit unit,
  7. BlockingQueue<Runnable> workQueue,
  8. ThreadFactory threadFactory,
  9. RejectedExecutionHandler handler)
  10. { ... }
  11. 1) corePoolSize:基本大小也就是线程池的目标大小,即在没有任务执行时(初期线程并不启动,而是等到有任务提交时才启动,除非调用 prestartAllCoreThreads)线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。
  12. 2) maximumPoolSize:线程池的最大大小表示可同时活动的线程数量的上限。如果某个线程的空闲时间超过了存活时间,那么将被标记为可回收的,并且当线程池的当前大小超过了基本大小时,这个线程将被终止。
  13. 3) newFixedThreadPool:工厂方法将线程池的基本大小和最大大小设置为参数中指定的值,而且创建的线程池不会超时。
  14. 4) newCachedThreadPool:对于 Executor, newCachedThreadPool 工厂方法是一个很好的默认选择
  15. a) 能提供比固定大小的线程池更好排队性能(因为使用了 SynchronousQueue 而不是 LinkedBlockingQueue)
  16. b) 工厂方法将线程池的最大大小设置为 Integ.MAX_VALUE, 而且将基本大小设置为 0,并将超时设置为 1 分钟,这种方法创建的线程池可以被无限扩展,并且当需求降低时会自动收缩。
  17. 5) 执行 excute 方法:
  18.  a) 如果当前运行的线程少于 corePoolSize,则创建新线程来执行任务(需要获得全局锁)
  19.  b) 如果运行的线程等于或多于 corePoolSize ,则将任务加入 BlockingQueue
  20. c) 如果无法将任务加入 BlockingQueue(队列已满),则创建新的线程来处理任务(需要获得全局锁)
  21.  d) 如果创建新线程将使当前运行的线程超 maxiumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution() 方法。
  • 管理队列任务
  1. 1) ThreadPoolExecutor 允许提供一个 BlockingQueue 来保存等待执行的任务。
  2.  
  3.  基本的任务排队方法有3种:无界队列(unbounded queue)、有界队列(bounded queue)、同步移交(synchronous handoff)。
  4.   
  5. 2) newFixedThreadPool newSingleThreadExecutor 在默认情况下将使用一个无界的 LinkedBlockingQueue
  6. a) 如果所有工作者线程都处于忙碌状态,那么任务将在队列中等候。如果任务持续地达到,并且超过了线程池处理它们的速度,那么队列将无限制地增加。
  7. b) 一种更稳妥的资源管理策略是使用有界队列,例如 ArrayBlockingQueue ,有界的 LinkedBlockingQueue, PriorityBlockingQueue
  8. c) 有界队列有助于避免资源耗尽的情况发生,但队列填满后,由饱和策略解决
  9. 3) newCachedThreadPool 工厂方法中使用了 SynchronousQueue
  10. a) 对于非常大的或者无界的线程池,可以通过使用 SynchronousQueue 来避免任务排队,以及直接将任务从生产者移交给工作者线程。
  11. b) SynchronousQueue不是一个真正的队列,而是一种在线程之间移交的机制
  12. c) 要将一个元素放入 SynchronousQueue 中,必须有另一个线程正在等待接受这个元素。如果没有线程正在等待,并且线程池的当前大小小于最大值,那么 TrheadPoolExecutor 将创建一个新的线程,否则根据饱和策略,这个任务将被拒绝。使用直接移交将更高效,因为任务会直接移交给执行它的线程,而不是首先放在队列中,然后由工作者线程从队列中提取该任务
  13. d) 只有当线程池是无界的或者可以拒绝任务时,SynchronousQueue 才有实际价值
  14. 4) 对于 ExecutornewCachedThreadPool 工厂方法是一种很好的默认选择,他能提供比固定大小的线程更好的排队性能(由于使用了 SynchronousQueue 而不是 LinkedBlockingQueue)。
  15.  当需要限制当前任务的数量以满足资源管理需求时,可以选择固定大小的线程池,就像在接受网络用户请求的服务器应用程序中,如果不进行限制,容易发生过载问题。
  • 饱和策略
  1. 当有界队列被填满后,饱和策略开始发挥作用。
  2. ThreadPoolExecutor 的饱和策略可以通过调用 setRejectedExecutionHandler 来修改。(如果某个任务被提交到一个已被关闭的 Executor 时,也会用到饱和策略)
  3. 1) AbortPolicy:“中止(Abort)策略”是默认的饱和策略,该策略将抛出未检查 Rejected-ExecutionException。调用这可以捕获这个异常,然后根据需求编写自己的处理代码。
  4. 2) DiscardPolicy:“抛弃(Discard)策略”会抛弃超出队列的任务。
  5. 3) DiscardOldestPolicy:“抛弃最旧策略“则会抛弃下个将被执行任务,然后尝试重新提交新的任务。(如果工作队列是一个优先队列,那么抛弃最旧策略将导致抛弃优先级最高的任务,因此最好不要将抛弃最旧饱和策略和优先级队列一起使用)  
  6. 4) CallerRunsPolicy:“调用者运行策略“实现了一种调节机制。该策略不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而减低新任务的流量。 它不会在线程池的某个线程中执行新提交的任务,新任务会在调用execute时在主线程中执行。
  • 线程工厂
  1. 1) 每当线程池需要创建一个线程时,都是通过线程工厂方法来完成的。
  2. 2) 默认的线程工厂方法将创建一个新的,非守护的线程,并且不包含特殊的配置信息。通过指定一个线程工厂方法,可以定制线程池的配置信息。
  3. 3) ThreadFactory 中只定义了一个方 newThread,每当线程池需要创建一个新线程都会调用这个方法。
  • 在调用构造函数后再定制 ThreadPoolExecutor
  1. 在调用完 ThreadPoolExecutor 的构造函数后,仍然可以通过设置函数(Setter)来修改大多数传递给它的构造函数的参数(例如线程池的基本大小,最大大小,存活时间,线程工厂以及拒绝执行处理器(rejected execution handler))。如果 Executor 是通过 Executors 中的某个(newSingleThreadExecutor除外)工厂方法创建的,那么可以将结果的类型转换为 ThreadPoolExecutor 以访问设置器。

4. 扩展 ThreadPoolExecutor

  1. 1) ThreadPoolExecutor是可扩展的,它提供了几个可以在子类化中改写的方法:beforeExecuteafterExecuteterminated,这些方法可以用于扩展ThreadPoolExecutor的行为。 在执行任务的线程中将调用beforeExecuteafterExecute等方法,在这些方法中还可以添加日志,计时,监视或统计信息收集的功能。
  2. 2) 无论是从run中正常返回,还是抛出一个异常而返回,afterExecute都会被调用。(如果任务在完成后带有一个Error,那么就不会调用afterExecute)如果beforeExecute抛出一个RuntimeException,那么任务将不被执行,并且afterExecute也不会被调用。
  3. 3) 在线程池完成关闭时调用terminated,也就是在所有任务都已经完成并且所有工作者线程也已经关闭后。terminated可以用来释放Executor在其生命周期里分配的各种资源,此外还可以执行发送通知,记录日志或收集finalize统计信息等操作。

5. 递归算法的并行度

  1. // 如果在循环中包含了一些密集计算,或者需要执行可能阻塞的 I/O 操作,那么只要每次迭代是独立的,都可以对其进行并行化。
  2. //串行
  3. void processSequentially(List<Element> elements) {
  4. for (Element e : elements)
  5. process(e);
  6. }
  7. //并行
  8. void processInParallel(Executor exec, List<Element> elements) {
  9. for (final Element e : elements)
  10. exec.execute(new Runnable() {
  11. public void run() { process(e); }
  12. });
  13. }

四. 图形用户界面应用程序

  • 暂无