如何启动FutureTask线程

  1. public Future<Object> create(String startTime) {
  2. FutureTask<Object> futureTask = new FutureTask(() -> {
  3. // todo:处理相关异步任务
  4. return object;
  5. });
  6. // 使用线程池等方式调用futureTask
  7. ExecutorService executor = Executors.newSingleThreadExecutor();
  8. executor.submit(futureTask);
  9. // Thread thread = new Thread(futureTask);
  10. // thread.start();
  11. return futureTask;
  12. }
  1. public<T> Object invokeServiceMethod(Class<T> clazz, String methodName, int seconds, Object... params) {
  2. Class<?>[] paramTypes = new Class<?>[params.length];
  3. for (int i=0; i < params.length; i++){
  4. paramTypes[i] = params[i].getClass();
  5. }
  6. FutureTask<Object> future = new FutureTask<Object>(new Callable<Object>(){
  7. @Override
  8. public Object call() throws Exception {
  9. Method method = clazz.getDeclaredMethod(methodName, paramTypes);
  10. method.setAccessible(true);
  11. T bean = SpringBeanUtils.getBean(clazz);
  12. return method.invoke(bean, params);
  13. }
  14. });
  15. ExecutorService executor = Executors.newSingleThreadExecutor();
  16. executor.execute(future);
  17. try {
  18. // 设定获取值的阻塞等待时间
  19. Object result = future.get(seconds, TimeUnit.SECONDS);
  20. return result;
  21. } catch (InterruptedException | ExecutionException | TimeoutException e) {
  22. LogUtil.error("数据访问异常");
  23. LogUtil.error(e.getMessage(), e);
  24. return Boolean.FALSE;
  25. } finally{
  26. future.cancel(true);
  27. executor.shutdown();
  28. }
  29. }

针对FutureTask的创建过程分析问题

问题一: FutureTask 的构造函数,如何通过new来创建自带返回值的FutureTask对象

(1)FutureTask的构造函数源码

  1. /**
  2. * Creates a {@code FutureTask} that will, upon running, execute the
  3. * given {@code Callable}.
  4. *
  5. * @param callable the callable task
  6. * @throws NullPointerException if the callable is null
  7. */
  8. public FutureTask(Callable<V> callable) {
  9. if (callable == null)
  10. throw new NullPointerException();
  11. this.callable = callable;
  12. this.state = NEW; // ensure visibility of callable
  13. }
  14. /**
  15. * Creates a {@code FutureTask} that will, upon running, execute the
  16. * given {@code Runnable}, and arrange that {@code get} will return the
  17. * given result on successful completion.
  18. *
  19. * @param runnable the runnable task
  20. * @param result the result to return on successful completion. If
  21. * you don't need a particular result, consider using
  22. * constructions of the form:
  23. * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
  24. * @throws NullPointerException if the runnable is null
  25. */
  26. public FutureTask(Runnable runnable, V result) {
  27. this.callable = Executors.callable(runnable, result);
  28. this.state = NEW; // ensure visibility of callable
  29. }

FutureTask的构造函数传递的参数类型是Callable,Callable与Runnable相似,但是Callable可以抛出异常。
同时,Callable中的call()方法相比Runnable中的run()方法, 前者有返回值,后者没有。
(2) Callable接口和Runnable区别
Callable与Runnable相似,但是Callable可以抛出异常。
Callable中的call()方法有返回值,Runnable中的run()方法没有返回值。

  1. @FunctionalInterface
  2. public interface Callable<V> {
  3. V call() throws Exception;
  4. }

(3) Runnable接口

  1. @FunctionalInterface
  2. public interface Runnable {
  3. public abstract void run();
  4. }

(4) FutureTask如何使用callable

  1. // 成员变量callable
  2. private Callable<V> callable;
  3. public void run() {
  4. if (state != NEW ||
  5. !UNSAFE.compareAndSwapObject(this, runnerOffset,
  6. null, Thread.currentThread()))
  7. return;
  8. try {
  9. Callable<V> c = callable;
  10. if (c != null && state == NEW) {
  11. V result;
  12. boolean ran;
  13. try {
  14. result = c.call();
  15. ran = true;
  16. } catch (Throwable ex) {
  17. result = null;
  18. ran = false;
  19. setException(ex);
  20. }
  21. if (ran)
  22. set(result);
  23. }
  24. } finally {
  25. // runner must be non-null until state is settled to
  26. // prevent concurrent calls to run()
  27. runner = null;
  28. // state must be re-read after nulling runner to prevent
  29. // leaked interrupts
  30. int s = state;
  31. if (s >= INTERRUPTING)
  32. handlePossibleCancellationInterrupt(s);
  33. }
  34. }

FutureTask启动run方法,会通过callable.call()获取到返回值result,然后将返回值result通过set()方法进行存储。
image.png
(5) set()方法和get()方法
set()方法中会把传入的callable的返回值,在线程安全的前提下,赋值给FutureTask的成员变量outcome。这个过程也代表了,启动Future会通过Callable来获取到一个结果,并将这个结果放到成员变量outcome,等待get()方法的获取。
image.png

  1. protected void set(V v) {
  2. if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
  3. outcome = v;
  4. UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
  5. finishCompletion();
  6. }
  7. }
  8. public V get() throws InterruptedException, ExecutionException {
  9. int s = state;
  10. if (s <= COMPLETING)
  11. s = awaitDone(false, 0L);
  12. return report(s);
  13. }

问题二:FutureTask类型的对象futureTask,为何可以传入到Thread对象构造方法中执行

FutureTask实现了RunnableFuture接口,RunnableFuture分别继承了Runnable接口和Future接口。

问题三:为何FutureTask可以当做Future类型的返回值

FutureTask实现了RunnableFuture接口,RunnableFuture分别继承了Runnable接口和Future接口。
image.png

问题四:FutureTask对象执行get()方法时,是如何获取返回值的

调用get()方法时,如果state完成计算状态( s>COMPLETING ),会调用 report() 方法获取结果。
report()方法变量x赋值了outcome,outcome是前面在set()方法中callable对象的返回值,最后report()返回x值。

  1. public V get() throws InterruptedException, ExecutionException {
  2. int s = state;
  3. if (s <= COMPLETING)
  4. s = awaitDone(false, 0L);
  5. return report(s);
  6. }
  7. private V report(int s) throws ExecutionException {
  8. Object x = outcome;
  9. if (s == NORMAL)
  10. return (V)x;
  11. if (s >= CANCELLED)
  12. throw new CancellationException();
  13. throw new ExecutionException((Throwable)x);
  14. }

image.png

问题五:FutureTask对象执行get()方法时,为什么会阻塞

调用get()方法时,如果state还在计算状态(s <= COMPLETING),就会调用awaitDone()方法进行等待。
如果s=COMPLETING状态,就会调用Thread.yield()使线程进入就绪状态,等待CPU的调度。
如果s>COMPLETING状态,说明当前线程完成,可以通过 return 方式执行结束。
如果s<COMPLETING状态,会通过 LockSupport.park(this) 将线程进入阻塞等待状态。

  1. public V get() throws InterruptedException, ExecutionException {
  2. int s = state;
  3. if (s <= COMPLETING)
  4. s = awaitDone(false, 0L);
  5. return report(s);
  6. }
  7. private int awaitDone(boolean timed, long nanos)
  8. throws InterruptedException {
  9. final long deadline = timed ? System.nanoTime() + nanos : 0L;
  10. WaitNode q = null;
  11. boolean queued = false;
  12. for (;;) {
  13. if (Thread.interrupted()) {
  14. removeWaiter(q);
  15. throw new InterruptedException();
  16. }
  17. int s = state;
  18. if (s > COMPLETING) {
  19. if (q != null)
  20. q.thread = null;
  21. return s;
  22. }
  23. else if (s == COMPLETING) // cannot time out yet
  24. Thread.yield();
  25. else if (q == null)
  26. q = new WaitNode();
  27. else if (!queued)
  28. queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
  29. q.next = waiters, q);
  30. else if (timed) {
  31. nanos = deadline - System.nanoTime();
  32. if (nanos <= 0L) {
  33. removeWaiter(q);
  34. return state;
  35. }
  36. LockSupport.parkNanos(this, nanos);
  37. }
  38. else
  39. LockSupport.park(this);
  40. }
  41. }

image.png