基于jdk1.8

1 Executor框架

image.png

1.1 ThreadPoolExecutor 属性

  1. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  2. private static final int COUNT_BITS = Integer.SIZE - 3;//29
  3. private static final int CAPACITY = (1 << COUNT_BITS) - 1;//536870911
  4. // runState is stored in the high-order bits
  5. private static final int RUNNING = -1 << COUNT_BITS;//-536870912
  6. private static final int SHUTDOWN = 0 << COUNT_BITS;//0
  7. private static final int STOP = 1 << COUNT_BITS;//536870912
  8. private static final int TIDYING = 2 << COUNT_BITS;//1073741824
  9. private static final int TERMINATED = 3 << COUNT_BITS;//1610612736
  10. // Packing and unpacking ctl
  11. private static int runStateOf(int c) { return c & ~CAPACITY; }//~CAPACITY=-536870912
  12. private static int workerCountOf(int c) { return c & CAPACITY; }
  13. private static int ctlOf(int rs, int wc) { return rs | wc; }

AtomicInteger ctl:利用低29位表示线程池中线程数,通过高3位表示
线程池的运行状态:
1、RUNNING:-1 << COUNT_BITS,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
2、SHUTDOWN: 0 << COUNT_BITS,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
3、STOP : 1 << COUNT_BITS,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
4、TIDYING : 2 << COUNT_BITS,即高3位为010, 所有的任务都已经终止;
5、TERMINATED: 3 << COUNT_BITS,即高3位为011, terminated()方法已经执行完成
execute –> addWorker –>runworker (getTask)线程池的工作线程通过Woker类实现,在ReentrantLock锁的保证下,把Woker实例插入到HashSet后,并启动Woker中的线程。
从Woker类的构造方法实现可以发现:线程工厂在创建线程thread时,将Woker实例本身this作为参数传入,当执行start方法启动线程thread时,本质是执行了Worker的runWorker方法。
firstTask执行完成之后,通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;

1.2 execute()

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. int c = ctl.get();
  5. if (workerCountOf(c) < corePoolSize) {
  6. //workerCountOf获取线程池的当前线程数;小于corePoolSize,执行addWorker创建一条新线程执行command任务
  7. if (addWorker(command, true))
  8. return;
  9. c = ctl.get();
  10. }
  11. // 如果当前有效线程大于或等于核心线程数,并且线程池处于RUNNING状态,则将任务加入阻塞队列并成功,等待线程取出队列执行
  12. if (isRunning(c) && workQueue.offer(command)) {
  13. int recheck = ctl.get();
  14. // recheck and if necessary 回滚到入队操作前,即倘若线程池shutdown状态,就remove(command)
  15. //如果线程池没有RUNNING,成功从阻塞队列中删除任务,执行拒绝策略方法处理任务
  16. if (! isRunning(recheck) && remove(command))
  17. reject(command);
  18. //线程池处于running状态,但是没有线程,则创建线程
  19. else if (workerCountOf(recheck) == 0)
  20. addWorker(null, false);
  21. }
  22. // 往线程池中创建新的线程失败,则reject任务
  23. else if (!addWorker(command, false))
  24. reject(command);
  25. }

image.png

1.2.1 remove

如果存在此任务,则将其从执行器的内部队列中移除

  1. public boolean remove(Runnable task) {
  2. boolean removed = workQueue.remove(task);
  3. tryTerminate(); // In case SHUTDOWN and now empty
  4. return removed;
  5. }

1.2.2 reject

调用拒绝策略执行处理程序。

  1. final void reject(Runnable command) {
  2. handler.rejectedExecution(command, this);
  3. }

1.3 addWorker

  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. retry:
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);
  6. // Check if queue empty only if necessary.
  7. if (rs >= SHUTDOWN &&
  8. ! (rs == SHUTDOWN &&
  9. firstTask == null &&
  10. ! workQueue.isEmpty()))
  11. return false;
  12. for (;;) {
  13. int wc = workerCountOf(c);
  14. if (wc >= CAPACITY ||
  15. wc >= (core ? corePoolSize : maximumPoolSize))
  16. return false;
  17. if (compareAndIncrementWorkerCount(c))
  18. break retry;
  19. c = ctl.get(); // Re-read ctl
  20. if (runStateOf(c) != rs)
  21. continue retry;
  22. // else CAS failed due to workerCount change; retry inner loop
  23. }
  24. }
  25. }

addWoker方法实现的前半部分:
1、判断线程池的状态,如果线程池的状态值大于或等SHUTDOWN,则不处理提交的任务,直接返回;
2、通过参数core判断当前需要创建的线程是否为核心线程,如果core为true,且当前线程数小于corePoolSize,cas线程数加1,跳出循环

  1. private boolean compareAndIncrementWorkerCount(int expect) {
  2. return ctl.compareAndSet(expect, expect + 1);
  3. }

开始创建新的线程

  1. boolean workerStarted = false;
  2. boolean workerAdded = false;
  3. Worker w = null;
  4. try {
  5. w = new Worker(firstTask);
  6. final Thread t = w.thread;
  7. if (t != null) {
  8. final ReentrantLock mainLock = this.mainLock;
  9. mainLock.lock();
  10. try {
  11. // Recheck while holding lock.
  12. // Back out on ThreadFactory failure or if
  13. // shut down before lock acquired.
  14. int rs = runStateOf(ctl.get());
  15. if (rs < SHUTDOWN ||
  16. (rs == SHUTDOWN && firstTask == null)) {
  17. if (t.isAlive()) // precheck that t is startable
  18. throw new IllegalThreadStateException();
  19. workers.add(w);
  20. int s = workers.size();
  21. if (s > largestPoolSize)
  22. largestPoolSize = s;
  23. workerAdded = true;
  24. }
  25. } finally {
  26. mainLock.unlock();
  27. }
  28. if (workerAdded) {
  29. t.start();
  30. workerStarted = true;
  31. }
  32. }
  33. } finally {
  34. if (! workerStarted)
  35. addWorkerFailed(w);
  36. }
  37. return workerStarted;

线程池的工作线程通过Woker类实现,在ReentrantLock锁的保证下,把Woker实例插入到HashSet后,并启动Woker中的线程。

1.4 Worker

  1. private final class Worker
  2. extends AbstractQueuedSynchronizer
  3. implements Runnable
  4. {
  5. /**
  6. * This class will never be serialized, but we provide a
  7. * serialVersionUID to suppress a javac warning.
  8. */
  9. private static final long serialVersionUID = 6138294804551838833L;
  10. /** Thread this worker is running in. Null if factory fails. */
  11. final Thread thread;
  12. /** Initial task to run. Possibly null. */
  13. Runnable firstTask;
  14. /** Per-thread task counter */
  15. volatile long completedTasks;
  16. /**
  17. * Creates with given first task and thread from ThreadFactory.
  18. * @param firstTask the first task (null if none)
  19. */
  20. Worker(Runnable firstTask) {
  21. setState(-1); // inhibit interrupts until runWorker
  22. this.firstTask = firstTask;
  23. this.thread = getThreadFactory().newThread(this);
  24. }
  25. /** Delegates main run loop to outer runWorker */
  26. public void run() {
  27. runWorker(this);
  28. }
  29. // Lock methods
  30. //
  31. // The value 0 represents the unlocked state.
  32. // The value 1 represents the locked state.
  33. protected boolean isHeldExclusively() {
  34. return getState() != 0;
  35. }
  36. protected boolean tryAcquire(int unused) {
  37. if (compareAndSetState(0, 1)) {
  38. setExclusiveOwnerThread(Thread.currentThread());
  39. return true;
  40. }
  41. return false;
  42. }
  43. protected boolean tryRelease(int unused) {
  44. setExclusiveOwnerThread(null);
  45. setState(0);
  46. return true;
  47. }
  48. public void lock() { acquire(1); }
  49. public boolean tryLock() { return tryAcquire(1); }
  50. public void unlock() { release(1); }
  51. public boolean isLocked() { return isHeldExclusively(); }
  52. void interruptIfStarted() {
  53. Thread t;
  54. if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
  55. try {
  56. t.interrupt();
  57. } catch (SecurityException ignore) {
  58. }
  59. }
  60. }
  61. }

其中Worker类设计如下:
1、继承了AQS类,可以方便的实现工作线程的中止操作;
2、实现了Runnable接口,可以将自身作为一个任务在工作线程中执行;
3、当前提交的任务firstTask作为参数传入Worker的构造方法;

1.5 runWorker

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. Runnable task = w.firstTask;
  4. w.firstTask = null;
  5. w.unlock(); // allow interrupts
  6. boolean completedAbruptly = true;
  7. try {
  8. while (task != null || (task = getTask()) != null) {
  9. w.lock();
  10. // If pool is stopping, ensure thread is interrupted;
  11. // if not, ensure thread is not interrupted. This
  12. // requires a recheck in second case to deal with
  13. // shutdownNow race while clearing interrupt
  14. if ((runStateAtLeast(ctl.get(), STOP) ||
  15. (Thread.interrupted() &&
  16. runStateAtLeast(ctl.get(), STOP))) &&
  17. !wt.isInterrupted())
  18. wt.interrupt();
  19. try {
  20. beforeExecute(wt, task);
  21. Throwable thrown = null;
  22. try {
  23. task.run();
  24. } catch (RuntimeException x) {
  25. thrown = x; throw x;
  26. } catch (Error x) {
  27. thrown = x; throw x;
  28. } catch (Throwable x) {
  29. thrown = x; throw new Error(x);
  30. } finally {
  31. afterExecute(task, thrown);
  32. }
  33. } finally {
  34. task = null;
  35. w.completedTasks++;
  36. w.unlock();
  37. }
  38. }
  39. completedAbruptly = false;
  40. } finally {
  41. processWorkerExit(w, completedAbruptly);
  42. }
  43. }

runWorker方法是线程池的核心:
1、线程启动之后,通过unlock方法释放锁,设置AQS的state为0,表示运行中断;
2、获取第一个任务firstTask,执行任务的run方法,不过在执行任务之前,会进行加锁操作,任务执行完会释放锁;
3、在执行任务的前后,可以根据业务场景自定义beforeExecute和afterExecute方法;
4、firstTask执行完成之后,通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;

1.6 getTask

  1. private Runnable getTask() {
  2. boolean timedOut = false; // 上次poll()是否超时
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);
  6. // 检查队列是否为空
  7. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  8. decrementWorkerCount();
  9. return null;
  10. }
  11. int wc = workerCountOf(c);
  12. // Are workers subject to culling?
  13. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  14. //当超过最大线程数或超时满足其一, 且 超过线程队列最大容量(CAPACITY=536870911)或队列为空时,ctr 进行CAS减一,成功返回null,退出
  15. if ((wc > maximumPoolSize || (timed && timedOut))
  16. && (wc > 1 || workQueue.isEmpty())) {
  17. if (compareAndDecrementWorkerCount(c))
  18. return null;
  19. continue;
  20. }
  21. try {
  22. Runnable r = timed ?
  23. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  24. workQueue.take();
  25. if (r != null)
  26. return r;
  27. timedOut = true;//从队列取出线程为null
  28. } catch (InterruptedException retry) {
  29. timedOut = false;
  30. }
  31. }
  32. }

1.6.1 workerCountOf

  1. private static int workerCountOf(int c) { return c & CAPACITY; }

1.6.2 compareAndDecrementWorkerCount

  1. private boolean compareAndDecrementWorkerCount(int expect) {
  2. return ctl.compareAndSet(expect, expect - 1);
  3. }

整个getTask操作在自旋下完成:
1、remove,remove()是从队列中删除第一个元素。remove() 的行为与 Collection 接口的相似
2、poll,但是新的 poll() 方法时有值时返回值,在空集合调用时不是抛出异常,只是返回 null,
3、take: 获取并移除此队列的头部,在元素变得可用之前一直等待 。queue的长度 == 0 的时候,一直阻塞

2 Future和Callable实现

通过ExecutorService.submit()方法提交的任务,可以获取任务执行完的返回值。
示例:

  1. public class ExecutorByCallableTest {
  2. public static void main(String[] args) {
  3. ExecutorService executorService = Executors.newFixedThreadPool(10);
  4. Future future = executorService.submit(new FutureTaskTest());
  5. try {
  6. System.out.println("线程返回结果:" + future.get());
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. } catch (ExecutionException e) {
  10. e.printStackTrace();
  11. }
  12. }
  13. }
  14. class FutureTaskTest implements Callable {
  15. @Override
  16. public Object call() throws Exception {
  17. return "my is FutureTaskTest...";
  18. }
  19. }

输出:

  1. 线程返回结果:my is FutureTaskTest...

在实际业务场景中,Future和Callable基本是成对出现的,Callable负责产生结果,Future负责获取结果。
1、Callable接口类似于Runnable,只是Runnable没有返回值。
2、Callable任务除了返回正常结果之外,如果发生异常,该异常也会被返回,即Future可以拿到异步执行任务各种结果;
3、Future.get方法会导致主线程阻塞,直到Callable任务执行完成;

2.1 submit

  1. public <T> Future<T> submit(Callable<T> task) {
  2. if (task == null) throw new NullPointerException();
  3. RunnableFuture<T> ftask = newTaskFor(task);
  4. execute(ftask);
  5. return ftask;
  6. }

通过submit方法提交的Callable任务会被封装成了一个FutureTask对象。
最后交给1.2 execute()

2.2 FutureTask

一种可取消的异步计算。此类提供了Future的基本实现,其中包含启动和取消计算、查询以查看计算是否完成以及检索计算结果的方法。只有在计算完成后才能检索结果;如果计算尚未完成,get方法将阻塞。计算完成后,无法重新启动或取消计算(除非使用runAndReset调用计算)。
FutureTask可用于包装可调用或可运行的对象。因为FutureTask实现Runnable,所以FutureTask可以提交给执行者执行。

  1. public class FutureTask<V> implements RunnableFuture<V> {
  2. private volatile int state;//任务状态
  3. private static final int NEW = 0; //任务新建和执行中
  4. private static final int COMPLETING = 1; //任务将要执行完毕
  5. private static final int NORMAL = 2; //任务正常执行结束
  6. private static final int EXCEPTIONAL = 3; //任务异常
  7. private static final int CANCELLED = 4; //任务取消
  8. private static final int INTERRUPTING = 5; //任务线程即将被中断
  9. private static final int INTERRUPTED = 6; //任务线程已中断
  10. ...

此任务的运行状态,最初为NEW。运行状态仅在方法set、setException和cancel中转换为终端状态。在完成过程中,状态可能会有一个瞬态值:completed(当结果正在设置时)或interruption(仅当中断运行器以满足取消(true)时)。从这些中间状态到最终状态的转换使用更便宜的有序/惰性写操作,因为值是唯一的,不能进一步修改。可能的状态转换:NEW -> completion -> NORMAL NEW -> completion -> exception NEW -> CANCELLED NEW -> interruption -> INTERRUPTED

2.3 FutureTask.get

  1. public V get() throws InterruptedException, ExecutionException {
  2. int s = state;
  3. if (s <= COMPLETING)
  4. s = awaitDone(false, 0L);
  5. return report(s);
  6. }
  7. public V get(long timeout, TimeUnit unit)
  8. throws InterruptedException, ExecutionException, TimeoutException {
  9. if (unit == null)
  10. throw new NullPointerException();
  11. int s = state;
  12. if (s <= COMPLETING &&
  13. (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
  14. throw new TimeoutException();
  15. return report(s);
  16. }

get的源码很简洁,首先校验参数,然后根据state状态判断是否超时,如果超时则异常,不超时则调用report(s)去获取最终结果。
当 s<= COMPLETING时,表明任务仍然在执行且没有被取消。如果它为true,那么走到awaitDone方法。
awaitDone是futureTask实现阻塞的关键方法,我们重点关注一下它的实现原理。

2.3.1 awaitDone

  1. private int awaitDone(boolean timed, long nanos)
  2. throws InterruptedException {
  3. // 任务截止时间
  4. final long deadline = timed ? System.nanoTime() + nanos : 0L;
  5. WaitNode q = null;
  6. boolean queued = false;
  7. // 自旋
  8. for (;;) {
  9. //线程中断则移除等待线程,并抛出异常
  10. if (Thread.interrupted()) {
  11. removeWaiter(q);
  12. throw new InterruptedException();
  13. }
  14. int s = state;
  15. // 任务可能已经正常完成或者中断或者被取消了
  16. if (s > COMPLETING) {
  17. if (q != null)
  18. q.thread = null;
  19. return s;
  20. }
  21. // 可能任务线程被阻塞了,主线程让出CPU
  22. else if (s == COMPLETING) // cannot time out yet
  23. Thread.yield();
  24. // 等待线程节点为空,则初始化新节点并关联当前线程
  25. else if (q == null)
  26. q = new WaitNode();
  27. else if (!queued)
  28. // 等待线程入队列,成功则queued=true
  29. queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
  30. q.next = waiters, q);
  31. else if (timed) {
  32. nanos = deadline - System.nanoTime();
  33. //已经超时的话,移除等待节点
  34. if (nanos <= 0L) {
  35. removeWaiter(q);
  36. return state;
  37. }
  38. // 未超时,将当前线程挂起指定时间
  39. LockSupport.parkNanos(this, nanos);
  40. }
  41. else
  42. // timed=false时会走到这里,挂起当前线程
  43. LockSupport.park(this);
  44. }
  45. }

2.4 FutureTask.run

  1. public void run() {
  2. //校验任务状态
  3. if (state != NEW ||
  4. !UNSAFE.compareAndSwapObject(this, runnerOffset,
  5. null, Thread.currentThread()))
  6. return;
  7. try {
  8. Callable<V> c = callable;
  9. if (c != null && state == NEW) {
  10. V result;
  11. boolean ran;
  12. try {
  13. //获取结果
  14. result = c.call();
  15. ran = true;
  16. } catch (Throwable ex) {
  17. result = null;
  18. ran = false;
  19. setException(ex);
  20. }
  21. if (ran)
  22. set(result);
  23. }
  24. } finally {
  25. // runner must be non-null until state is settled to
  26. // prevent concurrent calls to run()
  27. runner = null;
  28. // state must be re-read after nulling runner to prevent
  29. // leaked interrupts
  30. int s = state;
  31. if (s >= INTERRUPTING)
  32. handlePossibleCancellationInterrupt(s);
  33. }
  34. }

2.5 FutureTask.set

将此future的结果设置为给定的值。

  1. protected void set(V v) {
  2. if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
  3. outcome = v;
  4. UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
  5. finishCompletion();
  6. }
  7. }
  8. protected void setException(Throwable t) {
  9. if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
  10. outcome = t;
  11. UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
  12. finishCompletion();
  13. }
  14. }

2.6 finishCompletion

  1. private void finishCompletion() {
  2. // assert state > COMPLETING;
  3. for (WaitNode q; (q = waiters) != null;) {
  4. if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
  5. for (;;) {
  6. Thread t = q.thread;
  7. if (t != null) {
  8. q.thread = null;
  9. LockSupport.unpark(t);
  10. }
  11. WaitNode next = q.next;
  12. if (next == null)
  13. break;
  14. q.next = null; // unlink to help gc
  15. q = next;
  16. }
  17. break;
  18. }
  19. }
  20. done();
  21. callable = null; // to reduce footprint
  22. }

1、执行FutureTask类的get方法时,会把主线程封装成WaitNode节点并保存在waiters链表中;
2、FutureTask任务执行完成后,通过UNSAFE设置waiters的值,并通过LockSupport类unpark方法唤醒主线程;