1. threadPoolExecutor.execute(new MyTask(i));
  2. public void execute(Runnable command) {
  3. if (command == null)
  4. throw new NullPointerException();
  5. //clt记录着runState和workerCount
  6. int c = ctl.get();
  7. //workerCountOf当前线程数,当前的线程数是否小于核心线数
  8. //添加到核心线程
  9. if (workerCountOf(c) < corePoolSize) {
  10. //新建一个线程并把任务添加到该线程中
  11. if (addWorker(command, true))
  12. return;
  13. c = ctl.get();
  14. }
  15. //isRunning当前线程池是否运行状态,offer添加任务到队列是否成功
  16. if (isRunning(c) && workQueue.offer(command)) {
  17. int recheck = ctl.get();
  18. //再次判断线程池是否在运行,不在运行,则需要把之前的添加的任务command移除
  19. if (! isRunning(recheck) && remove(command))
  20. reject(command);//这里是拒绝策略
  21. else if (workerCountOf(recheck) == 0)
  22. //如果当前工作线程池数是否为0,创建一个null任务
  23. //这样做的意义是保证线程池在running状态必须有一个任务在执行
  24. //让线程池保持run状态,可以一直接受任务
  25. addWorker(null, false);
  26. }
  27. //上面的核心线程和队列都放不在的情况下,把任务放进去非核心线程,core=false
  28. else if (!addWorker(command, false))
  29. //加不进非核心线程的情况下,受到maximumPoolSize的影响,会执行拒绝策略
  30. reject(command);
  31. }
  1. 判断当前的线程数是否小于corePoolSize,如果是,通过addWord方法创建一个新的线程,提交任务。
  2. 在第一步没有完成任务提交,状态为运行并且能成功加入任务到工作队列后,再进行一次check,如果状态在任务加入队列后变为了非运行状态,则进行拒绝策略。后再判断当前线程数是否为0,如果是,则新增一个空任务。
  3. 如果不能加入任务到工作队列,将尝试使用非核心线程创建一个任务
  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. retry:
  3. //自旋
  4. for (;;) {
  5. int c = ctl.get();
  6. int rs = runStateOf(c);
  7. // Check if queue empty only if necessary.
  8. //>= SHUTDOWN表示是关闭或者终止状态,不进行处理
  9. //= SHUTDOWN && 队列不为空,表示不接受任务,继续处理队列中的任务
  10. if (rs >= SHUTDOWN &&
  11. ! (rs == SHUTDOWN &&
  12. firstTask == null &&
  13. ! workQueue.isEmpty()))
  14. return false;
  15. for (;;) {
  16. // 获取线程数
  17. int wc = workerCountOf(c);
  18. //线程数超过容量,或者当前要添加的线程超出核心线程数或者最大线程数,
  19. //则不进行处理
  20. if (wc >= CAPACITY ||
  21. wc >= (core ? corePoolSize : maximumPoolSize))
  22. return false;
  23. //cas成功,添加一个新的工作线程,跳转到后面new Worker()
  24. if (compareAndIncrementWorkerCount(c))
  25. break retry;
  26. c = ctl.get(); // Re-read ctl
  27. if (runStateOf(c) != rs)
  28. continue retry;
  29. // else CAS failed due to workerCount change; retry inner loop
  30. }
  31. }
  32. boolean workerStarted = false;
  33. boolean workerAdded = false;
  34. Worker w = null;
  35. try {
  36. //创建一个线程
  37. w = new Worker(firstTask);
  38. final Thread t = w.thread;
  39. if (t != null) {
  40. //加锁
  41. final ReentrantLock mainLock = this.mainLock;
  42. mainLock.lock();
  43. try {
  44. // Recheck while holding lock.
  45. // Back out on ThreadFactory failure or if
  46. // shut down before lock acquired.
  47. int rs = runStateOf(ctl.get());
  48. //< SHUTDOWN 表示线程池是存活状态
  49. if (rs < SHUTDOWN ||
  50. (rs == SHUTDOWN && firstTask == null)) {
  51. //线程还没启动,但是线程已经是存活状态的这种情况,抛异常
  52. if (t.isAlive()) // precheck that t is startable
  53. throw new IllegalThreadStateException();
  54. // 把刚创建的线程添加到线程池hashSet中
  55. workers.add(w);
  56. int s = workers.size();
  57. if (s > largestPoolSize)
  58. largestPoolSize = s;
  59. workerAdded = true;
  60. }
  61. } finally {
  62. mainLock.unlock();
  63. }
  64. if (workerAdded) {
  65. t.start();//执行线程的任务,start会调用run方法,run会调用runWorker
  66. workerStarted = true;
  67. }
  68. }
  69. } finally {
  70. if (! workerStarted)
  71. addWorkerFailed(w);
  72. }
  73. return workerStarted;
  74. }
  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. //获取第一个任务
  4. Runnable task = w.firstTask;
  5. w.firstTask = null;
  6. //允许线程中断
  7. w.unlock(); // allow interrupts
  8. boolean completedAbruptly = true;
  9. try {
  10. //task(核心和非核心线程)不为空 或者阻塞队列中拿到了任务
  11. while (task != null || (task = getTask()) != null) {
  12. w.lock();
  13. //如果当前线程池状态等于stop 就中断
  14. if ((runStateAtLeast(ctl.get(), STOP) ||
  15. (Thread.interrupted() &&
  16. runStateAtLeast(ctl.get(), STOP))) &&
  17. !wt.isInterrupted())
  18. wt.interrupt();
  19. try {
  20. beforeExecute(wt, task);
  21. Throwable thrown = null;
  22. try {
  23. //执行线程真实的任务
  24. task.run();
  25. } catch (RuntimeException x) {
  26. thrown = x; throw x;
  27. } catch (Error x) {
  28. thrown = x; throw x;
  29. } catch (Throwable x) {
  30. thrown = x; throw new Error(x);
  31. } finally {
  32. afterExecute(task, thrown);
  33. }
  34. } finally {
  35. //设置task为null 则while会执行拿取下一个任务
  36. task = null;
  37. w.completedTasks++;
  38. w.unlock();
  39. }
  40. }
  41. completedAbruptly = false;
  42. } finally {
  43. processWorkerExit(w, completedAbruptly);
  44. }
  45. }

ThreadPoolExecutor内部有实现4个拒绝策略:

  • CallerRunsPolicy,由调用execute方法提交任务的线程来执行这个任务;
  • AbortPolicy,抛出异常RejectedExecutionException拒绝提交任务;
  • DiscardPolicy,直接抛弃任务,不做任何处理;
  • DiscardOldestPolicy,去除任务队列中的第一个任务(最旧的),重新提交;