1. public class FutureTask<V> implements RunnableFuture<V> {
    2. //表示当前task状态
    3. private volatile int state;
    4. //当前任务尚未执行
    5. private static final int NEW = 0;
    6. //当前任务正在结束,稍微完全结束,一种临界状态
    7. private static final int COMPLETING = 1;
    8. //当前任务正常结束
    9. private static final int NORMAL = 2;
    10. //当前任务执行过程中发生了异常。 内部封装的 callable.run() 向上抛出异常了
    11. private static final int EXCEPTIONAL = 3;
    12. //当前任务被取消
    13. private static final int CANCELLED = 4;
    14. //当前任务中断中..
    15. private static final int INTERRUPTING = 5;
    16. //当前任务已中断
    17. private static final int INTERRUPTED = 6;
    18. //submit(runnable/callable) runnable 使用 装饰者模式 伪装成 Callable了。
    19. private Callable<V> callable;
    20. //正常情况下:任务正常执行结束,outcome保存执行结果。 callable 返回值。
    21. //非正常情况:callable向上抛出异常,outcome保存异常
    22. private Object outcome; // non-volatile, protected by state reads/writes
    23. //当前任务被线程执行期间,保存当前执行任务的线程对象引用。
    24. private volatile Thread runner;
    25. //因为会有很多线程去get当前任务的结果,所以 这里使用了一种数据结构 stack 头插 头取 的一个队列。
    26. private volatile WaitNode waiters;
    27. @SuppressWarnings("unchecked")
    28. private V report(int s) throws ExecutionException {
    29. //正常情况下,outcome 保存的是callable运行结束的结果
    30. //非正常,保存的是 callable 抛出的异常。
    31. Object x = outcome;
    32. //条件成立:当前任务状态正常结束
    33. if (s == NORMAL)
    34. //直接返回callable运算结果
    35. return (V)x;
    36. //被取消状态
    37. if (s >= CANCELLED)
    38. throw new CancellationException();
    39. //执行到这,说明callable接口实现中,是有bug的...
    40. throw new ExecutionException((Throwable)x);
    41. }
    42. /**
    43. * Creates a {@code FutureTask} that will, upon running, execute the
    44. * given {@code Callable}.
    45. *
    46. * @param callable the callable task
    47. * @throws NullPointerException if the callable is null
    48. */
    49. public FutureTask(Callable<V> callable) {
    50. if (callable == null)
    51. throw new NullPointerException();
    52. //callable就是程序员自己实现的业务类
    53. this.callable = callable;
    54. //设置当前任务状态为 NEW
    55. this.state = NEW; // ensure visibility of callable
    56. }
    57. public FutureTask(Runnable runnable, V result) {
    58. //使用装饰者模式将runnable转换为了 callable接口,外部线程 通过get获取
    59. //当前任务执行结果时,结果可能为 null 也可能为 传进来的值。
    60. this.callable = Executors.callable(runnable, result);
    61. this.state = NEW; // ensure visibility of callable
    62. }
    63. public boolean isCancelled() {
    64. return state >= CANCELLED;
    65. }
    66. public boolean isDone() {
    67. return state != NEW;
    68. }
    69. public boolean cancel(boolean mayInterruptIfRunning) {
    70. //条件一:state == NEW 成立 表示当前任务处于运行中 或者 处于线程池 任务队列中..
    71. //条件二:UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))
    72. // 条件成立:说明修改状态成功,可以去执行下面逻辑了,否则 返回false 表示cancel失败。
    73. if (!(state == NEW &&
    74. UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
    75. mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
    76. return false;
    77. try { // in case call to interrupt throws exception
    78. if (mayInterruptIfRunning) {
    79. try {
    80. //执行当前FutureTask 的线程,有可能现在是null,是null 的情况是: 当前任务在 队列中,还没有线程获取到它呢。。
    81. Thread t = runner;
    82. //条件成立:说明当前线程 runner ,正在执行task.
    83. if (t != null)
    84. //给runner线程一个中断信号.. 如果你的程序是响应中断 会走中断逻辑..假设你程序不是响应中断的..啥也不会发生。
    85. t.interrupt();
    86. } finally { // final state
    87. //设置任务状态为 中断完成。
    88. UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
    89. }
    90. }
    91. } finally {
    92. //唤醒所有get() 阻塞的线程。
    93. finishCompletion();
    94. }
    95. return true;
    96. }
    97. /**
    98. * @throws CancellationException {@inheritDoc}
    99. */
    100. //场景:多个线程等待当前任务执行完成后的结果...
    101. public V get() throws InterruptedException, ExecutionException {
    102. //获取当前任务状态
    103. int s = state;
    104. //条件成立:未执行、正在执行、正完成。 调用get的外部线程会被阻塞在get方法上。
    105. if (s <= COMPLETING)
    106. //返回task当前状态,可能当前线程在里面已经睡了一会了..
    107. s = awaitDone(false, 0L);
    108. return report(s);
    109. }
    110. /**
    111. * @throws CancellationException {@inheritDoc}
    112. */
    113. public V get(long timeout, TimeUnit unit)
    114. throws InterruptedException, ExecutionException, TimeoutException {
    115. if (unit == null)
    116. throw new NullPointerException();
    117. int s = state;
    118. if (s <= COMPLETING &&
    119. (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
    120. throw new TimeoutException();
    121. return report(s);
    122. }
    123. protected void done() { }
    124. protected void set(V v) {
    125. //使用CAS方式设置当前任务状态为 完成中..
    126. //有没有可能失败呢? 外部线程等不及了,直接在set执行CAS之前 将 task取消了。 很小概率事件。
    127. if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    128. outcome = v;
    129. //将结果赋值给 outcome之后,马上会将当前任务状态修改为 NORMAL 正常结束状态。
    130. UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
    131. //猜一猜?
    132. //最起码得把get() 再此阻塞的线程 唤醒..
    133. finishCompletion();
    134. }
    135. }
    136. protected void setException(Throwable t) {
    137. //使用CAS方式设置当前任务状态为 完成中..
    138. //有没有可能失败呢? 外部线程等不及了,直接在set执行CAS之前 将 task取消了。 很小概率事件。
    139. if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    140. //引用的是 callable 向上层抛出来的异常。
    141. outcome = t;
    142. //将当前任务的状态 修改为 EXCEPTIONAL
    143. UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
    144. //回头讲完 get() 。
    145. finishCompletion();
    146. }
    147. }
    148. //submit(runnable/callable) -> newTaskFor(runnable) -> execute(task) -> pool
    149. //任务执行入口
    150. public void run() {
    151. //条件一:state != NEW 条件成立,说明当前task已经被执行过了 或者 被cancel 了,总之非NEW状态的任务,线程就不处理了。
    152. //条件二:!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread())
    153. // 条件成立:cas失败,当前任务被其它线程抢占了...
    154. if (state != NEW ||
    155. !UNSAFE.compareAndSwapObject(this, runnerOffset,
    156. null, Thread.currentThread()))
    157. return;
    158. //执行到这里,当前task一定是 NEW 状态,而且 当前线程也抢占TASK成功!
    159. try {
    160. //callable 就是程序员自己封装逻辑的callable 或者 装饰后的runnable
    161. Callable<V> c = callable;
    162. //条件一:c != null 防止空指针异常
    163. //条件二:state == NEW 防止外部线程 cancel掉当前任务。
    164. if (c != null && state == NEW) {
    165. //结果引用
    166. V result;
    167. //true 表示callable.run 代码块执行成功 未抛出异常
    168. //false 表示callable.run 代码块执行失败 抛出异常
    169. boolean ran;
    170. try {
    171. //调用程序员自己实现的callable 或者 装饰后的runnable
    172. result = c.call();
    173. //c.call未抛出任何异常,ran会设置为true 代码块执行成功
    174. ran = true;
    175. } catch (Throwable ex) {
    176. //说明程序员自己写的逻辑块有bug了。
    177. result = null;
    178. ran = false;
    179. setException(ex);
    180. }
    181. if (ran)
    182. //说明当前c.call正常执行结束了。
    183. //set就是设置结果到outcome
    184. set(result);
    185. }
    186. } finally {
    187. // runner must be non-null until state is settled to
    188. // prevent concurrent calls to run()
    189. runner = null;
    190. // state must be re-read after nulling runner to prevent
    191. // leaked interrupts
    192. int s = state;
    193. if (s >= INTERRUPTING)
    194. //回头再说..讲了 cancel() 就明白了。
    195. handlePossibleCancellationInterrupt(s);
    196. }
    197. }
    198. protected boolean runAndReset() {
    199. if (state != NEW ||
    200. !UNSAFE.compareAndSwapObject(this, runnerOffset,
    201. null, Thread.currentThread()))
    202. return false;
    203. boolean ran = false;
    204. int s = state;
    205. try {
    206. Callable<V> c = callable;
    207. if (c != null && s == NEW) {
    208. try {
    209. c.call(); // don't set result
    210. ran = true;
    211. } catch (Throwable ex) {
    212. setException(ex);
    213. }
    214. }
    215. } finally {
    216. // runner must be non-null until state is settled to
    217. // prevent concurrent calls to run()
    218. runner = null;
    219. // state must be re-read after nulling runner to prevent
    220. // leaked interrupts
    221. s = state;
    222. if (s >= INTERRUPTING)
    223. handlePossibleCancellationInterrupt(s);
    224. }
    225. return ran && s == NEW;
    226. }
    227. /**
    228. * Ensures that any interrupt from a possible cancel(true) is only
    229. * delivered to a task while in run or runAndReset.
    230. */
    231. private void handlePossibleCancellationInterrupt(int s) {
    232. // It is possible for our interrupter to stall before getting a
    233. // chance to interrupt us. Let's spin-wait patiently.
    234. if (s == INTERRUPTING)
    235. while (state == INTERRUPTING)
    236. Thread.yield(); // wait out pending interrupt
    237. // assert state == INTERRUPTED;
    238. // We want to clear any interrupt we may have received from
    239. // cancel(true). However, it is permissible to use interrupts
    240. // as an independent mechanism for a task to communicate with
    241. // its caller, and there is no way to clear only the
    242. // cancellation interrupt.
    243. //
    244. // Thread.interrupted();
    245. }
    246. /**
    247. * Simple linked list nodes to record waiting threads in a Treiber
    248. * stack. See other classes such as Phaser and SynchronousQueue
    249. * for more detailed explanation.
    250. */
    251. static final class WaitNode {
    252. volatile Thread thread;
    253. volatile WaitNode next;
    254. WaitNode() { thread = Thread.currentThread(); }
    255. }
    256. /**
    257. * Removes and signals all waiting threads, invokes done(), and
    258. * nulls out callable.
    259. */
    260. private void finishCompletion() {
    261. // assert state > COMPLETING;
    262. //q指向waiters 链表的头结点。
    263. for (WaitNode q; (q = waiters) != null;) {
    264. //使用cas设置 waiters 为 null 是因为怕 外部线程使用 cancel 取消当前任务 也会触发finishCompletion方法。 小概率事件。
    265. if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
    266. for (;;) {
    267. //获取当前node节点封装的 thread
    268. Thread t = q.thread;
    269. //条件成立:说明当前线程不为null
    270. if (t != null) {
    271. q.thread = null;//help GC
    272. //唤醒当前节点对应的线程
    273. LockSupport.unpark(t);
    274. }
    275. //next 当前节点的下一个节点
    276. WaitNode next = q.next;
    277. if (next == null)
    278. break;
    279. q.next = null; // unlink to help gc
    280. q = next;
    281. }
    282. break;
    283. }
    284. }
    285. done();
    286. //将callable 设置为null helpGC
    287. callable = null; // to reduce footprint
    288. }
    289. private int awaitDone(boolean timed, long nanos)
    290. throws InterruptedException {
    291. //0 不带超时
    292. final long deadline = timed ? System.nanoTime() + nanos : 0L;
    293. //引用当前线程 封装成 WaitNode 对象
    294. WaitNode q = null;
    295. //表示当前线程 waitNode对象 有没有 入队/压栈
    296. boolean queued = false;
    297. //自旋
    298. for (;;) {
    299. //条件成立:说明当前线程唤醒 是被其它线程使用中断这种方式喊醒的。interrupted()
    300. //返回true 后会将 Thread的中断标记重置回false.
    301. if (Thread.interrupted()) {
    302. //当前线程node出队
    303. removeWaiter(q);
    304. //get方法抛出 中断异常。
    305. throw new InterruptedException();
    306. }
    307. //假设当前线程是被其它线程 使用unpark(thread) 唤醒的话。会正常自旋,走下面逻辑。
    308. //获取当前任务最新状态
    309. int s = state;
    310. //条件成立:说明当前任务 已经有结果了.. 可能是好 可能是 坏..
    311. if (s > COMPLETING) {
    312. //条件成立:说明已经为当前线程创建过node了,此时需要将 node.thread = null helpGC
    313. if (q != null)
    314. q.thread = null;
    315. //直接返回当前状态.
    316. return s;
    317. }
    318. //条件成立:说明当前任务接近完成状态...这里让当前线程再释放cpu ,进行下一次抢占cpu。
    319. else if (s == COMPLETING) // cannot time out yet
    320. Thread.yield();
    321. //条件成立:第一次自旋,当前线程还未创建 WaitNode 对象,此时为当前线程创建 WaitNode对象
    322. else if (q == null)
    323. q = new WaitNode();
    324. //条件成立:第二次自旋,当前线程已经创建 WaitNode对象了,但是node对象还未入队
    325. else if (!queued){
    326. //当前线程node节点 next 指向 原 队列的头节点 waiters 一直指向队列的头!
    327. q.next = waiters;
    328. //cas方式设置waiters引用指向 当前线程node, 成功的话 queued == true 否则,可能其它线程先你一步入队了。
    329. queued = UNSAFE.compareAndSwapObject(this, waitersOffset, waiters, q);
    330. }
    331. //第三次自旋,会到这里。
    332. else if (timed) {
    333. nanos = deadline - System.nanoTime();
    334. if (nanos <= 0L) {
    335. removeWaiter(q);
    336. return state;
    337. }
    338. LockSupport.parkNanos(this, nanos);
    339. }
    340. else
    341. //当前get操作的线程就会被park了。 线程状态会变为 WAITING状态,相当于休眠了..
    342. //除非有其它线程将你唤醒 或者 将当前线程 中断。
    343. LockSupport.park(this);
    344. }
    345. }
    346. private void removeWaiter(WaitNode node) {
    347. if (node != null) {
    348. node.thread = null;
    349. retry:
    350. for (;;) { // restart on removeWaiter race
    351. for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
    352. s = q.next;
    353. if (q.thread != null)
    354. pred = q;
    355. else if (pred != null) {
    356. pred.next = s;
    357. if (pred.thread == null) // check for race
    358. continue retry;
    359. }
    360. else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
    361. q, s))
    362. continue retry;
    363. }
    364. break;
    365. }
    366. }
    367. }
    368. // Unsafe mechanics
    369. private static final sun.misc.Unsafe UNSAFE;
    370. private static final long stateOffset;
    371. private static final long runnerOffset;
    372. private static final long waitersOffset;
    373. static {
    374. try {
    375. UNSAFE = sun.misc.Unsafe.getUnsafe();
    376. Class<?> k = FutureTask.class;
    377. stateOffset = UNSAFE.objectFieldOffset
    378. (k.getDeclaredField("state"));
    379. runnerOffset = UNSAFE.objectFieldOffset
    380. (k.getDeclaredField("runner"));
    381. waitersOffset = UNSAFE.objectFieldOffset
    382. (k.getDeclaredField("waiters"));
    383. } catch (Exception e) {
    384. throw new Error(e);
    385. }
    386. }
    387. }