1 Executor框架
1.1 ThreadPoolExecutor 属性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;//29private static final int CAPACITY = (1 << COUNT_BITS) - 1;//536870911// runState is stored in the high-order bitsprivate static final int RUNNING = -1 << COUNT_BITS;//-536870912private static final int SHUTDOWN = 0 << COUNT_BITS;//0private static final int STOP = 1 << COUNT_BITS;//536870912private static final int TIDYING = 2 << COUNT_BITS;//1073741824private static final int TERMINATED = 3 << COUNT_BITS;//1610612736// Packing and unpacking ctlprivate static int runStateOf(int c) { return c & ~CAPACITY; }//~CAPACITY=-536870912private static int workerCountOf(int c) { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }
AtomicInteger ctl:利用低29位表示线程池中线程数,通过高3位表示
线程池的运行状态:
1、RUNNING:-1 << COUNT_BITS,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
2、SHUTDOWN: 0 << COUNT_BITS,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
3、STOP : 1 << COUNT_BITS,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
4、TIDYING : 2 << COUNT_BITS,即高3位为010, 所有的任务都已经终止;
5、TERMINATED: 3 << COUNT_BITS,即高3位为011, terminated()方法已经执行完成
execute –> addWorker –>runworker (getTask)线程池的工作线程通过Woker类实现,在ReentrantLock锁的保证下,把Woker实例插入到HashSet后,并启动Woker中的线程。
从Woker类的构造方法实现可以发现:线程工厂在创建线程thread时,将Woker实例本身this作为参数传入,当执行start方法启动线程thread时,本质是执行了Worker的runWorker方法。
firstTask执行完成之后,通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;
1.2 execute()
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {//workerCountOf获取线程池的当前线程数;小于corePoolSize,执行addWorker创建一条新线程执行command任务if (addWorker(command, true))return;c = ctl.get();}// 如果当前有效线程大于或等于核心线程数,并且线程池处于RUNNING状态,则将任务加入阻塞队列并成功,等待线程取出队列执行if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// recheck and if necessary 回滚到入队操作前,即倘若线程池shutdown状态,就remove(command)//如果线程池没有RUNNING,成功从阻塞队列中删除任务,执行拒绝策略方法处理任务if (! isRunning(recheck) && remove(command))reject(command);//线程池处于running状态,但是没有线程,则创建线程else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 往线程池中创建新的线程失败,则reject任务else if (!addWorker(command, false))reject(command);}
1.2.1 remove
如果存在此任务,则将其从执行器的内部队列中移除
public boolean remove(Runnable task) {boolean removed = workQueue.remove(task);tryTerminate(); // In case SHUTDOWN and now emptyreturn removed;}
1.2.2 reject
调用拒绝策略执行处理程序。
final void reject(Runnable command) {handler.rejectedExecution(command, this);}
1.3 addWorker
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}}
addWoker方法实现的前半部分:
1、判断线程池的状态,如果线程池的状态值大于或等SHUTDOWN,则不处理提交的任务,直接返回;
2、通过参数core判断当前需要创建的线程是否为核心线程,如果core为true,且当前线程数小于corePoolSize,cas线程数加1,跳出循环
private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);}
开始创建新的线程
boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
线程池的工作线程通过Woker类实现,在ReentrantLock锁的保证下,把Woker实例插入到HashSet后,并启动Woker中的线程。
1.4 Worker
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID = 6138294804551838833L;/** Thread this worker is running in. Null if factory fails. */final Thread thread;/** Initial task to run. Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker */public void run() {runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}
其中Worker类设计如下:
1、继承了AQS类,可以方便的实现工作线程的中止操作;
2、实现了Runnable接口,可以将自身作为一个任务在工作线程中执行;
3、当前提交的任务firstTask作为参数传入Worker的构造方法;
1.5 runWorker
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
runWorker方法是线程池的核心:
1、线程启动之后,通过unlock方法释放锁,设置AQS的state为0,表示运行中断;
2、获取第一个任务firstTask,执行任务的run方法,不过在执行任务之前,会进行加锁操作,任务执行完会释放锁;
3、在执行任务的前后,可以根据业务场景自定义beforeExecute和afterExecute方法;
4、firstTask执行完成之后,通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;
1.6 getTask
private Runnable getTask() {boolean timedOut = false; // 上次poll()是否超时for (;;) {int c = ctl.get();int rs = runStateOf(c);// 检查队列是否为空if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//当超过最大线程数或超时满足其一, 且 超过线程队列最大容量(CAPACITY=536870911)或队列为空时,ctr 进行CAS减一,成功返回null,退出if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;//从队列取出线程为null} catch (InterruptedException retry) {timedOut = false;}}}
1.6.1 workerCountOf
private static int workerCountOf(int c) { return c & CAPACITY; }
1.6.2 compareAndDecrementWorkerCount
private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);}
整个getTask操作在自旋下完成:
1、remove,remove()是从队列中删除第一个元素。remove() 的行为与 Collection 接口的相似
2、poll,但是新的 poll() 方法时有值时返回值,在空集合调用时不是抛出异常,只是返回 null,
3、take: 获取并移除此队列的头部,在元素变得可用之前一直等待 。queue的长度 == 0 的时候,一直阻塞
2 Future和Callable实现
通过ExecutorService.submit()方法提交的任务,可以获取任务执行完的返回值。
示例:
public class ExecutorByCallableTest {public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(10);Future future = executorService.submit(new FutureTaskTest());try {System.out.println("线程返回结果:" + future.get());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}}class FutureTaskTest implements Callable {@Overridepublic Object call() throws Exception {return "my is FutureTaskTest...";}}
输出:
线程返回结果:my is FutureTaskTest...
在实际业务场景中,Future和Callable基本是成对出现的,Callable负责产生结果,Future负责获取结果。
1、Callable接口类似于Runnable,只是Runnable没有返回值。
2、Callable任务除了返回正常结果之外,如果发生异常,该异常也会被返回,即Future可以拿到异步执行任务各种结果;
3、Future.get方法会导致主线程阻塞,直到Callable任务执行完成;
2.1 submit
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}
通过submit方法提交的Callable任务会被封装成了一个FutureTask对象。
最后交给1.2 execute()
2.2 FutureTask
一种可取消的异步计算。此类提供了Future的基本实现,其中包含启动和取消计算、查询以查看计算是否完成以及检索计算结果的方法。只有在计算完成后才能检索结果;如果计算尚未完成,get方法将阻塞。计算完成后,无法重新启动或取消计算(除非使用runAndReset调用计算)。
FutureTask可用于包装可调用或可运行的对象。因为FutureTask实现Runnable,所以FutureTask可以提交给执行者执行。
public class FutureTask<V> implements RunnableFuture<V> {private volatile int state;//任务状态private static final int NEW = 0; //任务新建和执行中private static final int COMPLETING = 1; //任务将要执行完毕private static final int NORMAL = 2; //任务正常执行结束private static final int EXCEPTIONAL = 3; //任务异常private static final int CANCELLED = 4; //任务取消private static final int INTERRUPTING = 5; //任务线程即将被中断private static final int INTERRUPTED = 6; //任务线程已中断...
此任务的运行状态,最初为NEW。运行状态仅在方法set、setException和cancel中转换为终端状态。在完成过程中,状态可能会有一个瞬态值:completed(当结果正在设置时)或interruption(仅当中断运行器以满足取消(true)时)。从这些中间状态到最终状态的转换使用更便宜的有序/惰性写操作,因为值是唯一的,不能进一步修改。可能的状态转换:NEW -> completion -> NORMAL NEW -> completion -> exception NEW -> CANCELLED NEW -> interruption -> INTERRUPTED
2.3 FutureTask.get
public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);}public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();return report(s);}
get的源码很简洁,首先校验参数,然后根据state状态判断是否超时,如果超时则异常,不超时则调用report(s)去获取最终结果。
当 s<= COMPLETING时,表明任务仍然在执行且没有被取消。如果它为true,那么走到awaitDone方法。
awaitDone是futureTask实现阻塞的关键方法,我们重点关注一下它的实现原理。
2.3.1 awaitDone
private int awaitDone(boolean timed, long nanos)throws InterruptedException {// 任务截止时间final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;// 自旋for (;;) {//线程中断则移除等待线程,并抛出异常if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;// 任务可能已经正常完成或者中断或者被取消了if (s > COMPLETING) {if (q != null)q.thread = null;return s;}// 可能任务线程被阻塞了,主线程让出CPUelse if (s == COMPLETING) // cannot time out yetThread.yield();// 等待线程节点为空,则初始化新节点并关联当前线程else if (q == null)q = new WaitNode();else if (!queued)// 等待线程入队列,成功则queued=truequeued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {nanos = deadline - System.nanoTime();//已经超时的话,移除等待节点if (nanos <= 0L) {removeWaiter(q);return state;}// 未超时,将当前线程挂起指定时间LockSupport.parkNanos(this, nanos);}else// timed=false时会走到这里,挂起当前线程LockSupport.park(this);}}
2.4 FutureTask.run
public void run() {//校验任务状态if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {//获取结果result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}
2.5 FutureTask.set
将此future的结果设置为给定的值。
protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}}protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion();}}
2.6 finishCompletion
private void finishCompletion() {// assert state > COMPLETING;for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);}WaitNode next = q.next;if (next == null)break;q.next = null; // unlink to help gcq = next;}break;}}done();callable = null; // to reduce footprint}
1、执行FutureTask类的get方法时,会把主线程封装成WaitNode节点并保存在waiters链表中;
2、FutureTask任务执行完成后,通过UNSAFE设置waiters的值,并通过LockSupport类unpark方法唤醒主线程;

