常见的线程池
- newFixedThreadPool 固定线程数量的线程池
- newWorkStealingPool 工作窃取线程池
- newSingleThreadExecutor 单个线程的线程池
- newCachedThreadPool带缓存机制的线程池
- newSingleThreadScheduledExecutor 单个线程带定时任务的线程池
- newScheduledThreadPool 固定核心线程数的定时线程池
ThreadPoolExecutor源码分析
public ThreadPoolExecutor(int corePoolSize,//核心线程数int maximumPoolSize,//最大线程数long keepAliveTime,//超时等待时间TimeUnit unit,//超时等待时间单位BlockingQueue<Runnable> workQueue,//工作的阻塞队列ThreadFactory threadFactory,//创建线程的工厂RejectedExecutionHandler handler) {//拒绝策略if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
execute(Runnable command)执行线程
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {//工作的线程小于核心线程if (addWorker(command, true))//添加核心工作线程来处理工作return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {//大于核心线程数,且添加到阻塞队列成功int recheck = ctl.get();if (! isRunning(recheck) && remove(command))//再检查线程状态,如果处于非运行状态。移除对象reject(command);//拒绝策略else if (workerCountOf(recheck) == 0)//工作线程为0addWorker(null, false);//创建一个临时工作线程}else if (!addWorker(command, false))//添加临时线程来执行工作reject(command);//拒绝策略}
工作线程还没满的话,会创建核心线程来 来执行任务
核心线程满的话,会加入阻塞队列。加入成功的话会创建空线程执行获取队列
队列满了的话会创建临时线程来执行工作
临时线程也满的了的话,就会执行拒绝策略
addWorker(Runnable firstTask, boolean core)创建工作线程池
private boolean addWorker(Runnable firstTask, boolean core) {retry: //类似goto 伴随for循环使用,如果遇到continue就会跳回这里重新循环,遇到break就会结束循环for (;;) {//迭代int c = ctl.get();int rs = runStateOf(c);//获取线程状态// Check if queue empty only if necessary. 只在必要时检查队列是否为空if (rs >= SHUTDOWN && //RUNNING 运行中或者 关闭! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))//大于可传教线程return false;if (compareAndIncrementWorkerCount(c))//cas 可添加线程break retry;//直接跳到最外层循环,并结束循环c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;//跳到最外层的循环,重新循环// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);//创建一个新的工作线程final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;//重入锁mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);//将线程缓存到工作池中int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();//启动线程workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
通过cas来控制线程的数量
new Worker(firstTask)来创建新的工作线程
重入锁来控制多线程加入线程池的并发问题
Worker(Runnable firstTask)新工作线程的创建
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{...Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorker 在运行线程之前禁止中断this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}public void run() {runWorker(this);}....}
worker 继承了AbstractQueuedSynchronizer方法来保证线程之前禁止被中断
实现了Runnable接口。通过调用.start()方法来开始工作
runWorker(Worker w)真正执行方法
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;//获取要运行的真正线程w.firstTask = null;//gc回收w.unlock(); // allow interrupts 允许中断boolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {//获取任务,如果返回空就会跳出while循环,自动释放工作线程w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);//预留方法可自己实现Throwable thrown = null;try {task.run();//调用执行的方法,而不是启动一个线程来执行} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;//记录完成的工作数量w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
工作线程会调用需要执行线程的run方法去执行任务,而不是创建一个新的线程,这样来复用一个线程
如果获取线程返回null的话,就跳出while循环,自动结束线程
getTask()去阻塞队列获取任务
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();//减少工作线程return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//允许核心线程超时,或者。当前线程数超时核心线程数if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?//超时等待workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://执行队列超时等待workQueue.take();//没有任务的话阻塞在这里if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
通过timed变量来控制线程重队列里获取数据的时候是超时阻塞还是阻塞。
processWorkerExit(Worker w, boolean completedAbruptly) 移除线程池
private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w);//从线程池移除} finally {mainLock.unlock();}tryTerminate();int c = ctl.get();if (runStateLessThan(c, STOP)) {//判断是否需要创建新的线程(核心线程出现异常被销毁)if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);//创建一个线程}}
移除线程的时候也会加锁来保证线程安全
线程并没有区别核心线程和临时线程。回收的时候是随机回收线程的。
只有核心线程的时候也会因为出现异常也会被移除重新创建一个新的线程来代替
reject(Runnable command)拒绝策略
这里用了策略模式来根据不同的选择来决定走什么拒绝策略,JUC提供的拒绝策略有:
- 抛出异常
- 当前线程直接调用
- 队列中抛弃一个任务,让后再添加当前任务
- 什么都不做
支持用户自己去实现RejectedExecutionHandler接口来实现拒绝策略
