常见的线程池

  • newFixedThreadPool 固定线程数量的线程池
  • newWorkStealingPool 工作窃取线程池
  • newSingleThreadExecutor 单个线程的线程池
  • newCachedThreadPool带缓存机制的线程池
  • newSingleThreadScheduledExecutor 单个线程带定时任务的线程池
  • newScheduledThreadPool 固定核心线程数的定时线程池


ThreadPoolExecutor源码分析

  1. public ThreadPoolExecutor(int corePoolSize,//核心线程数
  2. int maximumPoolSize,//最大线程数
  3. long keepAliveTime,//超时等待时间
  4. TimeUnit unit,//超时等待时间单位
  5. BlockingQueue<Runnable> workQueue,//工作的阻塞队列
  6. ThreadFactory threadFactory,//创建线程的工厂
  7. RejectedExecutionHandler handler) {//拒绝策略
  8. if (corePoolSize < 0 ||
  9. maximumPoolSize <= 0 ||
  10. maximumPoolSize < corePoolSize ||
  11. keepAliveTime < 0)
  12. throw new IllegalArgumentException();
  13. if (workQueue == null || threadFactory == null || handler == null)
  14. throw new NullPointerException();
  15. this.acc = System.getSecurityManager() == null ?
  16. null :
  17. AccessController.getContext();
  18. this.corePoolSize = corePoolSize;
  19. this.maximumPoolSize = maximumPoolSize;
  20. this.workQueue = workQueue;
  21. this.keepAliveTime = unit.toNanos(keepAliveTime);
  22. this.threadFactory = threadFactory;
  23. this.handler = handler;
  24. }

execute(Runnable command)执行线程

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. int c = ctl.get();
  5. if (workerCountOf(c) < corePoolSize) {//工作的线程小于核心线程
  6. if (addWorker(command, true))//添加核心工作线程来处理工作
  7. return;
  8. c = ctl.get();
  9. }
  10. if (isRunning(c) && workQueue.offer(command)) {//大于核心线程数,且添加到阻塞队列成功
  11. int recheck = ctl.get();
  12. if (! isRunning(recheck) && remove(command))//再检查线程状态,如果处于非运行状态。移除对象
  13. reject(command);//拒绝策略
  14. else if (workerCountOf(recheck) == 0)//工作线程为0
  15. addWorker(null, false);//创建一个临时工作线程
  16. }
  17. else if (!addWorker(command, false))//添加临时线程来执行工作
  18. reject(command);//拒绝策略
  19. }

工作线程还没满的话,会创建核心线程来 来执行任务
核心线程满的话,会加入阻塞队列。加入成功的话会创建空线程执行获取队列
队列满了的话会创建临时线程来执行工作
临时线程也满的了的话,就会执行拒绝策略

addWorker(Runnable firstTask, boolean core)创建工作线程池

  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. retry: //类似goto 伴随for循环使用,如果遇到continue就会跳回这里重新循环,遇到break就会结束循环
  3. for (;;) {//迭代
  4. int c = ctl.get();
  5. int rs = runStateOf(c);//获取线程状态
  6. // Check if queue empty only if necessary. 只在必要时检查队列是否为空
  7. if (rs >= SHUTDOWN && //RUNNING 运行中或者 关闭
  8. ! (rs == SHUTDOWN &&
  9. firstTask == null &&
  10. ! workQueue.isEmpty()))
  11. return false;
  12. for (;;) {
  13. int wc = workerCountOf(c);
  14. if (wc >= CAPACITY ||
  15. wc >= (core ? corePoolSize : maximumPoolSize))//大于可传教线程
  16. return false;
  17. if (compareAndIncrementWorkerCount(c))//cas 可添加线程
  18. break retry;//直接跳到最外层循环,并结束循环
  19. c = ctl.get(); // Re-read ctl
  20. if (runStateOf(c) != rs)
  21. continue retry;//跳到最外层的循环,重新循环
  22. // else CAS failed due to workerCount change; retry inner loop
  23. }
  24. }
  25. boolean workerStarted = false;
  26. boolean workerAdded = false;
  27. Worker w = null;
  28. try {
  29. w = new Worker(firstTask);//创建一个新的工作线程
  30. final Thread t = w.thread;
  31. if (t != null) {
  32. final ReentrantLock mainLock = this.mainLock;//重入锁
  33. mainLock.lock();
  34. try {
  35. // Recheck while holding lock.
  36. // Back out on ThreadFactory failure or if
  37. // shut down before lock acquired.
  38. int rs = runStateOf(ctl.get());
  39. if (rs < SHUTDOWN ||
  40. (rs == SHUTDOWN && firstTask == null)) {
  41. if (t.isAlive()) // precheck that t is startable
  42. throw new IllegalThreadStateException();
  43. workers.add(w);//将线程缓存到工作池中
  44. int s = workers.size();
  45. if (s > largestPoolSize)
  46. largestPoolSize = s;
  47. workerAdded = true;
  48. }
  49. } finally {
  50. mainLock.unlock();
  51. }
  52. if (workerAdded) {
  53. t.start();//启动线程
  54. workerStarted = true;
  55. }
  56. }
  57. } finally {
  58. if (! workerStarted)
  59. addWorkerFailed(w);
  60. }
  61. return workerStarted;

通过cas来控制线程的数量
new Worker(firstTask)来创建新的工作线程
重入锁来控制多线程加入线程池的并发问题

Worker(Runnable firstTask)新工作线程的创建

  1. private final class Worker
  2. extends AbstractQueuedSynchronizer
  3. implements Runnable{
  4. ...
  5. Worker(Runnable firstTask) {
  6. setState(-1); // inhibit interrupts until runWorker 在运行线程之前禁止中断
  7. this.firstTask = firstTask;
  8. this.thread = getThreadFactory().newThread(this);
  9. }
  10. public void run() {
  11. runWorker(this);
  12. }
  13. ....
  14. }

worker 继承了AbstractQueuedSynchronizer方法来保证线程之前禁止被中断
实现了Runnable接口。通过调用.start()方法来开始工作

runWorker(Worker w)真正执行方法

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. Runnable task = w.firstTask;//获取要运行的真正线程
  4. w.firstTask = null;//gc回收
  5. w.unlock(); // allow interrupts 允许中断
  6. boolean completedAbruptly = true;
  7. try {
  8. while (task != null || (task = getTask()) != null) {//获取任务,如果返回空就会跳出while循环,自动释放工作线程
  9. w.lock();
  10. // If pool is stopping, ensure thread is interrupted;
  11. // if not, ensure thread is not interrupted. This
  12. // requires a recheck in second case to deal with
  13. // shutdownNow race while clearing interrupt
  14. if ((runStateAtLeast(ctl.get(), STOP) ||
  15. (Thread.interrupted() &&
  16. runStateAtLeast(ctl.get(), STOP))) &&
  17. !wt.isInterrupted())
  18. wt.interrupt();
  19. try {
  20. beforeExecute(wt, task);//预留方法可自己实现
  21. Throwable thrown = null;
  22. try {
  23. task.run();//调用执行的方法,而不是启动一个线程来执行
  24. } catch (RuntimeException x) {
  25. thrown = x; throw x;
  26. } catch (Error x) {
  27. thrown = x; throw x;
  28. } catch (Throwable x) {
  29. thrown = x; throw new Error(x);
  30. } finally {
  31. afterExecute(task, thrown);
  32. }
  33. } finally {
  34. task = null;
  35. w.completedTasks++;//记录完成的工作数量
  36. w.unlock();
  37. }
  38. }
  39. completedAbruptly = false;
  40. } finally {
  41. processWorkerExit(w, completedAbruptly);
  42. }
  43. }

工作线程会调用需要执行线程的run方法去执行任务,而不是创建一个新的线程,这样来复用一个线程
如果获取线程返回null的话,就跳出while循环,自动结束线程

getTask()去阻塞队列获取任务

  1. private Runnable getTask() {
  2. boolean timedOut = false; // Did the last poll() time out?
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);
  6. // Check if queue empty only if necessary.
  7. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  8. decrementWorkerCount();//减少工作线程
  9. return null;
  10. }
  11. int wc = workerCountOf(c);
  12. // Are workers subject to culling?
  13. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//允许核心线程超时,或者。当前线程数超时核心线程数
  14. if ((wc > maximumPoolSize || (timed && timedOut))
  15. && (wc > 1 || workQueue.isEmpty())) {
  16. if (compareAndDecrementWorkerCount(c))
  17. return null;
  18. continue;
  19. }
  20. try {
  21. Runnable r = timed ?//超时等待
  22. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://执行队列超时等待
  23. workQueue.take();//没有任务的话阻塞在这里
  24. if (r != null)
  25. return r;
  26. timedOut = true;
  27. } catch (InterruptedException retry) {
  28. timedOut = false;
  29. }
  30. }

通过timed变量来控制线程重队列里获取数据的时候是超时阻塞还是阻塞。

processWorkerExit(Worker w, boolean completedAbruptly) 移除线程池

  1. private void processWorkerExit(Worker w, boolean completedAbruptly) {
  2. if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
  3. decrementWorkerCount();
  4. final ReentrantLock mainLock = this.mainLock;
  5. mainLock.lock();
  6. try {
  7. completedTaskCount += w.completedTasks;
  8. workers.remove(w);//从线程池移除
  9. } finally {
  10. mainLock.unlock();
  11. }
  12. tryTerminate();
  13. int c = ctl.get();
  14. if (runStateLessThan(c, STOP)) {//判断是否需要创建新的线程(核心线程出现异常被销毁)
  15. if (!completedAbruptly) {
  16. int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
  17. if (min == 0 && ! workQueue.isEmpty())
  18. min = 1;
  19. if (workerCountOf(c) >= min)
  20. return; // replacement not needed
  21. }
  22. addWorker(null, false);//创建一个线程
  23. }
  24. }

移除线程的时候也会加锁来保证线程安全
线程并没有区别核心线程和临时线程。回收的时候是随机回收线程的。
只有核心线程的时候也会因为出现异常也会被移除重新创建一个新的线程来代替

reject(Runnable command)拒绝策略

这里用了策略模式来根据不同的选择来决定走什么拒绝策略,JUC提供的拒绝策略有:

  • 抛出异常
  • 当前线程直接调用
  • 队列中抛弃一个任务,让后再添加当前任务
  • 什么都不做

支持用户自己去实现RejectedExecutionHandler接口来实现拒绝策略