Java 线程池

1、当提交新任务时,异常如何处理呢?

先来看一段代码:

  1. ExecutorService threadPool = Executors.newFixedThreadPool(5);
  2. for (int i = 0; i < 5; i++) {
  3. threadPool.submit(() -> {
  4. System.out.println("current thread name" + Thread.currentThread().getName());
  5. Object object = null;
  6. System.out.print("result## "+object.toString());
  7. });
  8. }

显然,这段代码会有异常,再来看看运行结果
image.png
虽然没有结果输出,但是也没有抛出异常,所以无法感知任务出现了异常,所以需要添加try/catch。如下图:

  1. /**
  2. * ThreadPoolTest
  3. * <p>
  4. * encoding:UTF-8
  5. *
  6. * @author Fcant 上午 09:27 2021/9/1/0001
  7. */
  8. public class ThreadPoolTest {
  9. public static void main(String[] args) {
  10. ExecutorService threadPool = Executors.newFixedThreadPool(5);
  11. for (int i = 0; i < 5; i++) {
  12. threadPool.submit(() -> {
  13. System.out.println("current thread name" + Thread.currentThread().getName());
  14. try {
  15. Object object = null;
  16. System.out.print("result## " + object.toString());
  17. } catch (Exception e) {
  18. System.out.println("程序出异常啦!!!");
  19. }
  20. });
  21. }
  22. }
  23. }

image.png
因此,线程的异常处理,可以直接try...catch捕获。

2、线程池exec.submit()的执行流程

通过debug上面有异常的submit方法,处理有异常submit方法的主要执行流程图如下:
2021-09-01-09-24-06-957878.png
submit方法执行流程

  1. //构造feature对象
  2. /**
  3. * @throws RejectedExecutionException {@inheritDoc}
  4. * @throws NullPointerException {@inheritDoc}
  5. */
  6. public Future<?> submit(Runnable task) {
  7. if (task == null) throw new NullPointerException();
  8. RunnableFuture<Void> ftask = newTaskFor(task, null);
  9. execute(ftask);
  10. return ftask;
  11. }
  12. protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  13. return new FutureTask<T>(runnable, value);
  14. }
  15. public FutureTask(Runnable runnable, V result) {
  16. this.callable = Executors.callable(runnable, result);
  17. this.state = NEW; // ensure visibility of callable
  18. }
  19. public static <T> Callable<T> callable(Runnable task, T result) {
  20. if (task == null)
  21. throw new NullPointerException();
  22. return new RunnableAdapter<T>(task, result);
  23. }
  24. //线程池执行
  25. public void execute(Runnable command) {
  26. if (command == null)
  27. throw new NullPointerException();
  28. int c = ctl.get();
  29. if (workerCountOf(c) < corePoolSize) {
  30. if (addWorker(command, true))
  31. return;
  32. c = ctl.get();
  33. }
  34. if (isRunning(c) && workQueue.offer(command)) {
  35. int recheck = ctl.get();
  36. if (! isRunning(recheck) && remove(command))
  37. reject(command);
  38. else if (workerCountOf(recheck) == 0)
  39. addWorker(null, false);
  40. }
  41. else if (!addWorker(command, false))
  42. reject(command);
  43. }
  44. //捕获异常
  45. public void run() {
  46. if (state != NEW ||
  47. !UNSAFE.compareAndSwapObject(this, runnerOffset,
  48. null, Thread.currentThread()))
  49. return;
  50. try {
  51. Callable<V> c = callable;
  52. if (c != null && state == NEW) {
  53. V result;
  54. boolean ran;
  55. try {
  56. result = c.call();
  57. ran = true;
  58. } catch (Throwable ex) {
  59. result = null;
  60. ran = false;
  61. setException(ex);
  62. }
  63. if (ran)
  64. set(result);
  65. }
  66. } finally {
  67. // runner must be non-null until state is settled to
  68. // prevent concurrent calls to run()
  69. runner = null;
  70. // state must be re-read after nulling runner to prevent
  71. // leaked interrupts
  72. int s = state;
  73. if (s >= INTERRUPTING)
  74. handlePossibleCancellationInterrupt(s);
  75. }
  76. }

通过以上分析,submit执行的任务,可以通过Future对象的get方法接收抛出的异常,再进行处理。再通过一个demo,看一下Future对象的get方法处理异常的姿势,如下图:

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. import java.util.concurrent.Future;
  4. /**
  5. * ThreadPoolTest
  6. * <p>
  7. * encoding:UTF-8
  8. *
  9. * @author Fcant 上午 09:27 2021/9/1/0001
  10. */
  11. public class ThreadPoolTest {
  12. public static void main(String[] args) {
  13. ExecutorService threadPool = Executors.newFixedThreadPool(5);
  14. for (int i = 0; i < 5; i++) {
  15. Future future = threadPool.submit(() -> {
  16. System.out.println("current thread name" + Thread.currentThread().getName());
  17. Object object = null;
  18. System.out.print("result## " + object.toString());
  19. });
  20. try {
  21. future.get();
  22. } catch (Exception e) {
  23. System.out.println("程序出异常啦!!!");
  24. }
  25. }
  26. }
  27. }

image.png
因此,可以使用这两种方案处理线程池异常:

  1. 在任务代码try/catch捕获异常,
  2. 通过Future对象的get方法接收抛出的异常

    3、为工作者线程设置UncaughtExceptionHandler,在uncaughtException方法中处理异常

    也可以为工作者线程设置UncaughtExceptionHandler,在uncaughtException方法中处理异常,直接看这样实现的正确姿势:
    1. ExecutorService threadPool = Executors.newFixedThreadPool(1, r -> {
    2. Thread t = new Thread(r);
    3. t.setUncaughtExceptionHandler(
    4. (t1, e) -> {
    5. System.out.println(t1.getName() + "线程抛出的异常"+e);
    6. });
    7. return t;
    8. });
    9. threadPool.execute(()->{
    10. Object object = null;
    11. System.out.print("result## " + object.toString());
    12. });
    运行结果:
    image.png

    4、重写ThreadPoolExecutorafterExecute方法,处理传递的异常引用

    这是jdk文档的一个demo:
    1. class ExtendedExecutor extends ThreadPoolExecutor {
    2. // 这可是jdk文档里面给的例子。。
    3. protected void afterExecute(Runnable r, Throwable t) {
    4. super.afterExecute(r, t);
    5. if (t == null && r instanceof Future<?>) {
    6. try {
    7. Object result = ((Future<?>) r).get();
    8. } catch (CancellationException ce) {
    9. t = ce;
    10. } catch (ExecutionException ee) {
    11. t = ee.getCause();
    12. } catch (InterruptedException ie) {
    13. Thread.currentThread().interrupt(); // ignore/reset
    14. }
    15. }
    16. if (t != null)
    17. System.out.println(t);
    18. }
    19. }}

    5、因此,被问到线程池异常处理,如何回答?

    2021-09-01-09-24-08-153881.png