线程池使用 FutureTask 时如果把拒绝策略设置为 DiscardPolicy 和 DiscardOldestPolicy,并且在被拒绝的任务的 Future 对象上调用了无参 get 方法,那么调用线程会一直被阻塞

问题复现

下面先通过一个简单的例子来复现问题。

  1. public class FutureTest {
  2. //(1)线程池单个线程,线程池队列元素个数为 1
  3. private final static ThreadPoolExecutor executorService = new
  4. ThreadPoolExecutor1, 1, 1L, TimeUnit.MINUTES,
  5. new ArrayBlockingQueue<Runnable>(1), new ThreadPoolExecutor.
  6. DiscardPolicy());
  7. public static void mainString[] args throws Exception {
  8. //(2)添加任务 one
  9. Future futureOne = executorService.submit(new Runnable() {
  10. @Override
  11. public void run() {
  12. System.out.println(「start runable one」);
  13. try {
  14. Thread.sleep(5000);
  15. } catch InterruptedException e {
  16. e.printStackTrace();
  17. }
  18. }
  19. });
  20. //(3)添加任务 two
  21. Future futureTwo = executorService.submit(new Runnable() {
  22. @Override
  23. public void run() {
  24. System.out.println(「start runable two」);
  25. }
  26. });
  27. //(4)添加任务 three
  28. Future futureThree=null
  29. try {
  30. futureThree = executorService.submit(new Runnable() {
  31. @Override
  32. public void run() {
  33. System.out.println(「start runable three」);
  34. }
  35. });
  36. } catch Exception e {
  37. System.out.println(e.getLocalizedMessage());
  38. }
  39. System.out.println(「task one + futureOne.get()); //(5)等待任务 one 执行完毕
  40. System.out.println(「task two + futureTwo.get()); //(6)等待任务 two 执行完毕
  41. System.out.println(「task three + (futureThree==null null:futureThree.
  42. get())); // (7)等待任务 three 执行完毕
  43. executorService.shutdown(); //(8)关闭线程池,阻塞直到所有任务执行完毕
  44. }

输出结果为

线程池使用 FutureTask 时需要注意的事情 - 图1

代码(1)创建了一个单线程和一个队列元素个数为 1 的线程池,并且把拒绝策略设置为 DiscardPolicy。

代码(2)向线程池提交了一个任务 one,并且这个任务会由唯一的线程来执行,任务在打印 start runable one 后会阻塞该线程 5s。

代码(3)向线程池提交了一个任务 two,这时候会把任务 two 放入阻塞队列。

代码(4)向线程池提交任务 three,由于队列已满所以触发拒绝策略丢弃任务 three。从执行结果看,在任务 one 阻塞的 5s 内,主线程执行到了代码(5)并等待任务 one 执行完毕,当任务 one 执行完毕后代码(5)返回,主线程打印出 task one null。任务 one 执行完成后线程池的唯一线程会去队列里面取出任务 two 并执行,所以输出 start runable two,然后代码(6)返回,这时候主线程输出 task two null。然后执行代码(7)等待任务 three 执行完毕。从执行结果看,代码(7)会一直阻塞而不会返回,至此问题产生。如果把拒绝策略修改为 DiscardOldestPolicy,也会存在有一个任务的 get 方法一直阻塞,只是现在是任务 two 被阻塞。

但是如果把拒绝策略设置为默认的 AbortPolicy 则会正常返回,并且会输出如下结果。

  1. start runable one
  2. Task java.util.concurrent.FutureTask@135fbaa4 rejected from java.util.concurrent.Thr
  3. eadPoolExecutor@45ee12a7[Running, pool size = 1, active threads = 1, queued tasks =
  4. 1, completed tasks = 0]
  5. task one null
  6. start runable two
  7. task two null
  8. task three null

问题分析

要分析这个问题,需要看线程池的 submit 方法都做了什么,submit 方法的代码如下。

  1. public Future<? > submitRunnable task {
  2. ...
  3. //(1)装饰 Runnable 为 Future 对象
  4. RunnableFuture<Void> ftask = newTaskFortask, null);
  5. executeftask);
  6. //(6)返回 Future 对象
  7. return ftask
  8. }
  9. protected <T> RunnableFuture<T> newTaskForRunnable runnable, T value {
  10. return new FutureTask<T>(runnable, value);
  11. }
  12. public void executeRunnable command {
  13. ...
  14. //(2) 如果线程个数小于核心线程数则新增处理线程
  15. int c = ctl.get();
  16. if workerCountOf(c) < corePoolSize {
  17. if addWorker(command, true))
  18. return
  19. c = ctl.get();
  20. }
  21. //(3)如果当前线程个数已经达到核心线程数则把任务放入队列
  22. if isRunning(c) && workQueue.offer(command)) {
  23. int recheck = ctl.get();
  24. if (! isRunning(recheck) && remove(command))
  25. rejectcommand);
  26. else if workerCountOf(recheck) == 0
  27. addWorkernull, false);
  28. }
  29. //(4)尝试新增处理线程
  30. else if (! addWorker(command, false))
  31. rejectcommand); //(5)新增失败则调用拒绝策略
  32. }

在以上代码中,代码(1)装饰 Runnable 为 FutureTask 对象,然后调用线程池的 execute 方法。

代码(2)判断如果线程个数小于核心线程数则新增处理线程。

代码(3)判断如果当前线程个数已经达到核心线程数则将任务放入队列。

代码(4)尝试新增处理线程。失败则执行代码(5),否则直接使用新线程处理。代码(5)执行具体拒绝策略,从这里也可以看出,使用业务线程执行拒绝策略。

所以要找到上面例子中问题所在,只需要看代码(5)对被拒绝任务的影响,这里先看下拒绝策略 DiscardPolicy 的代码。

  1. public static class DiscardPolicy implements RejectedExecutionHandler {
  2. public DiscardPolicy() { }
  3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  4. }
  5. }

拒绝策略的 rejectedExecution 方法什么都没做,代码(4)调用 submit 后会返回一个 Future 对象。这里有必要再次重申,Future 是有状态的,Future 的状态枚举值如下。

  1. private static final int NEW = 0;
  2. private static final int COMPLETING = 1;
  3. private static final int NORMAL = 2;
  4. private static final int EXCEPTIONAL = 3;
  5. private static final int CANCELLED = 4;
  6. private static final int INTERRUPTING = 5;
  7. private static final int INTERRUPTED = 6;

在代码(1)中使用 newTaskFor 方法将 Runnable 任务转换为 FutureTask,而在 FutureTask 的构造函数里面设置的状态就是 NEW。

  1. public FutureTask(Runnable runnable, V result) {
  2. this.callable = Executors.callable(runnable, result);
  3. this.state = NEW; // ensure visibility of callable
  4. }

所以使用 DiscardPolicy 策略提交后返回了一个状态为 NEW 的 Future 对象。那么我们下面就需要看下当调用 Future 的无参 get 方法时 Future 变为什么状态才会返回,那就要看下 FutureTask 的 get()方法代码。

  1. public V get() throws InterruptedException ExecutionException {
  2. int s = state
  3. //当状态值 <=COMPLETING 时需要等待,否则调用 report 返回
  4. if s <= COMPLETING
  5. s = awaitDonefalse, 0L);
  6. return reports);
  7. }
  8. private V reportint s throws ExecutionException {
  9. Object x = outcome
  10. //状态值为 NORMAL 正常返回
  11. if s == NORMAL
  12. return Vx
  13. //状态值大于等于 CANCELLED 则抛出异常
  14. if s >= CANCELLED
  15. throw new CancellationException();
  16. throw new ExecutionException((Throwable)x);
  17. }

也就是说,当 Future 的状态 >COMPLETING 时调用 get 方法才会返回,而明显 DiscardPolicy 策略在拒绝元素时并没有设置该 Future 的状态,后面也没有其他机会可以设置该 Future 的状态,所以 Future 的状态一直是 NEW,所以一直不会返回。同理,DiscardOldestPolicy 策略也存在这样的问题,最老的任务被淘汰时没有设置被淘汰任务对应 Future 的状态。

那么默认的 AbortPolicy 策略为啥没问题呢?其实在执行 AbortPolicy 策略时,代码(5)会直接抛出 RejectedExecutionException 异常,也就是 submit 方法并没有返回 Future 对象,这时候 futureThree 是 null。

所以当使用 Future 时,尽量使用带超时时间的 get 方法,这样即使使用了 DiscardPolicy 拒绝策略也不至于一直等待,超时时间到了就会自动返回。如果非要使用不带参数的 get 方法则可以重写 DiscardPolicy 的拒绝策略,在执行策略时设置该 Future 的状态大于 COMPLETING 即可。但是我们查看 FutureTask 提供的方法,会发现只有 cancel 方法是 public 的,并且可以设置 FutureTask 的状态大于 COMPLETING,则重写拒绝策略的具体代码如下。

  1. public class MyRejectedExecutionHandler implements RejectedExecutionHandler{
  2. @Override
  3. public void rejectedExecution(Runnable runable, ThreadPoolExecutor e) {
  4. if (! e.isShutdown()) {
  5. if(null ! = runable && runable instanceof FutureTask){
  6. ((FutureTask) runable).cancel(true);
  7. }
  8. }
  9. }
  10. }

使用这个策略时,由于在 cancel 的任务上调用 get()方法会抛出异常,所以代码(7)需要使用 try-catch 块捕获异常,因此将代码(7)修改为如下所示。

  1. try{
  2. System.out.println(「task three + (futureThree==null null:futureThree.
  3. get())); // (6)等待任务 three
  4. }catchException e){
  5. System.out.println(e.getLocalizedMessage());
  6. }

执行结果为

线程池使用 FutureTask 时需要注意的事情 - 图2

当然这相比正常情况多了一个异常捕获操作。最好的情况是,重写拒绝策略时设置 FutureTask 的状态为 NORMAL,但是这需要重写 FutureTask 方法,因为 FutureTask 并没有提供接口让我们设置。

小结

本节通过案例介绍了在线程池中使用 FutureTask 时,当拒绝策略为 DiscardPolicy 和 DiscardOldestPolicy 时,在被拒绝的任务的 FutureTask 对象上调用 get()方法会导致调用线程一直阻塞,所以在日常开发中尽量使用带超时参数的 get 方法以避免线程一直阻塞。