核心构造方法

  1. //001
  2. /**
  3. * 当任务到来时corePool才被初始化
  4. *
  5. * 核心线程池大小
  6. * 1.cpu密集型:
  7. * CPU密集的意思是该任务需要大量的运算,而没有阻塞,CPU一直全速运行。
  8. * CPU密集任务只有在真正的多核CPU才可能得到加速(通过多线程)。
  9. * CPU密集型的任务配置尽可能少的线程数量:
  10. * 一般公式:CPU核数+1个线程的线程池。
  11. *
  12. * 2.IO密集型:(分两种):
  13. * 1.由于IO密集型任务的线程并不是一直在执行任务,则应配置尽可能多的线程,如CPU核数*2
  14. * 2.IO密集型,即任务需要大量的IO,即大量的阻塞。
  15. * 在单线程上运行IO密集型的任务会导致浪费大量的CPU运算能力浪费在等待。
  16. * 所以在IO密集型任务中使用多线程可以大大的加速程序运行。故需要·多配置线程数:
  17. * 参考公式:CPU核数/(1-阻塞系数 ) 阻塞系数在(0.8-0.9)之间
  18. * 比如8核CPU:8/(1-0.9) = 80个线程数
  19. */
  20. int corePoolSize = 2;
  21. /**最大线程池大小 当队列满的时候才会创建非核心线程*/
  22. int maximumPoolSize = 10;
  23. /**线程最大空闲时间*/
  24. long keepAliveTime = 60;
  25. /**时间单位*/
  26. java.util.concurrent.TimeUnit unit = java.util.concurrent.TimeUnit.SECONDS;
  27. /**线程等待队列*/
  28. BlockingQueue<Runnable> workQueue = new LinkedBlockingDeque<>(100);
  29. /**
  30. * 线程创建工厂
  31. * ThreadFactory threadFactory = Executors.defaultThreadFactory();
  32. * return new Executors.DefaultThreadFactory();返回用于创建新线程的默认线程工厂
  33. * DefaultThreadFactory的构造方法主要做了 声明安全管理器 得到线程组 生成线程名前缀
  34. * 返回用于创建新线程的线程工厂,这些新线程与当前线程具有相同的权限
  35. * ThreadFactory threadFactory = ThreadFactory.privilegedThreadFactory();
  36. * return new Executors.PrivilegedThreadFactory(); 主要添加了访问权限校验
  37. */
  38. ThreadFactory threadFactory = Executors.defaultThreadFactory();
  39. /**
  40. * 拒绝策略 核心代码void rejectedExecution(Runnable var1, ThreadPoolExecutor var2);
  41. *
  42. * 丢弃任务并抛出RejectedExecutionException异常。
  43. * private static final RejectedExecutionHandler defaultHandler =
  44. * new ThreadPoolExecutor.AbortPolicy();
  45. * 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
  46. * private static final RejectedExecutionHandler handler =
  47. * new ThreadPoolExecutor.DiscardOldestPolicy();
  48. * 也是丢弃任务,但是不抛出异常。
  49. * private static final RejectedExecutionHandler handler =
  50. * new ThreadPoolExecutor.DiscardPolicy();
  51. * 由调用线程处理该任务
  52. * private static final RejectedExecutionHandler handler =
  53. * new ThreadPoolExecutor.CallerRunsPolicy();
  54. */
  55. RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
  56. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
  57. corePoolSize, maximumPoolSize, keepAliveTime,
  58. unit, workQueue, threadFactory, handler);
  59. threadPoolExecutor.execute(
  60. ()->{
  61. try {
  62. Thread.sleep(1000);
  63. } catch (InterruptedException e) {
  64. System.out.println("Thread sleep fail");
  65. }
  66. });

添加Runnable接口

  1. //002 execute
  2. public void execute(Runnable command) {
  3. if (command == null) {
  4. throw new NullPointerException();
  5. } else {
  6. int c = this.ctl.get();
  7. //1,判断工作线程数<核心线程数
  8. //workerCountOf(c) -> c & 536870911 = 00011111 11111111 11111111 11111111
  9. if (workerCountOf(c) < this.corePoolSize) {
  10. //添加Worker
  11. if (this.addWorker(command, true)) {
  12. return;
  13. }
  14. c = this.ctl.get();
  15. }
  16. //2,运行态 将runnable加入队列
  17. if (isRunning(c) && this.workQueue.offer(command)) {
  18. int recheck = this.ctl.get();
  19. if (!isRunning(recheck) && this.remove(command)) {
  20. //拒绝 runnable
  21. this.reject(command);
  22. } else if (workerCountOf(recheck) == 0) {
  23. this.addWorker((Runnable)null, false);
  24. }//3,使用尝试使用最大线程运行
  25. } else if (!this.addWorker(command, false)) {
  26. this.reject(command);
  27. }
  28. }
  29. }

添加工作线程

  1. //003 addWorker
  2. private boolean addWorker(Runnable firstTask, boolean core) {
  3. //所有有效线程的数量
  4. int c = this.ctl.get();
  5. // 0<=c<=536870912
  6. label253:
  7. while(!runStateAtLeast(c, 0) || !runStateAtLeast(c, 536870912)
  8. && firstTask == null && !this.workQueue.isEmpty()) {
  9. //工作线程数量小于核心线程数量?最大线程数量
  10. while(workerCountOf(c) <
  11. ((core ? this.corePoolSize : this.maximumPoolSize) & 536870911)) {
  12. //自旋CAS
  13. if (this.compareAndIncrementWorkerCount(c)) {
  14. // 线程启动标志位
  15. boolean workerStarted = false;
  16. // 线程是否加入workers 标志位
  17. boolean workerAdded = false;
  18. ThreadPoolExecutor.Worker w = null;
  19. try {
  20. //创建worker
  21. //Worker extends AbstractQueuedSynchronizer implements Runnable
  22. w = new ThreadPoolExecutor.Worker(firstTask);
  23. Thread t = w.thread;
  24. if (t != null) {
  25. //this.mainLock = new ReentrantLock();构造方式时赋值
  26. ReentrantLock mainLock = this.mainLock;
  27. //非公平锁
  28. mainLock.lock();
  29. try {
  30. c = this.ctl.get();
  31. // 获取到锁以后仍需检查ctl,
  32. // 可能在上一个获取到锁处理的线程可能会改变runState
  33. // 如 ThreadFactory 创建失败 或线程池被 shut down等
  34. if (isRunning(c) || runStateLessThan(c, 536870912)
  35. && firstTask == null) {
  36. if (t.isAlive()) {
  37. throw new IllegalThreadStateException();
  38. }
  39. //添加到workers集合
  40. this.workers.add(w);
  41. int s = this.workers.size();
  42. if (s > this.largestPoolSize) {
  43. this.largestPoolSize = s;
  44. }
  45. workerAdded = true;
  46. }
  47. } finally {
  48. mainLock.unlock();
  49. }
  50. if (workerAdded) {
  51. // 启动线程 封装的太重了 runWorker(ThreadPoolExecutor.Worker w)
  52. //在这个方法里会有this.workers.remove(w);调用
  53. t.start();
  54. workerStarted = true;
  55. }
  56. }
  57. } finally {
  58. // 失败操作
  59. if (!workerStarted) {
  60. this.addWorkerFailed(w);
  61. }
  62. }
  63. return workerStarted;
  64. }
  65. c = this.ctl.get();
  66. if (runStateAtLeast(c, 0)) {
  67. continue label253;
  68. }
  69. }
  70. return false;
  71. }
  72. return false;
  73. }

Worker类

  1. /**
  2. * Worker 实现了简单的 非重入互斥锁,互斥容易理解,非重入是为了避免线程池的一些控制方法获得重入锁,
  3. * 比如setCorePoolSize操作。注意 Worker 实现锁的目的与传统锁的意义不太一样。
  4. * 其主要是为了控制线程是否可interrupt,以及其他的监控,如线程是否 active(正在执行任务)。
  5. * 线程池里线程是否处于运行状态与普通线程不一样,
  6. * 普通线程可以调用 Thread.currentThread().isAlive() 方法来判断,
  7. * 而线程池,在run方法中可能在等待获取新任务,这期间线程线程是 alive 但是却不是 active。
  8. */
  9. //0031 Worker
  10. private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  11. private static final long serialVersionUID = 6138294804551838833L;
  12. /** 每个worker有自己的内部线程,ThreadFactory创建失败时是null */
  13. final Thread thread;
  14. /** 初始化任务 */
  15. Runnable firstTask;
  16. /** 每个worker的完成任务数 */
  17. volatile long completedTasks;
  18. Worker(Runnable firstTask) {
  19. // 禁止线程在启动前被打断
  20. this.setState(-1);
  21. this.firstTask = firstTask;
  22. this.thread = getThreadFactory().newThread(this);
  23. }
  24. public void run() {
  25. ThreadPoolExecutor.this.runWorker(this);
  26. }
  27. // state = 0 代表未锁;state = 1 代表已锁
  28. protected boolean isHeldExclusively() {
  29. return this.getState() != 0;
  30. }
  31. protected boolean tryAcquire(int unused) {
  32. if (this.compareAndSetState(0, 1)) {
  33. this.setExclusiveOwnerThread(Thread.currentThread());
  34. return true;
  35. } else {
  36. return false;
  37. }
  38. }
  39. protected boolean tryRelease(int unused) {
  40. this.setExclusiveOwnerThread((Thread)null);
  41. this.setState(0);
  42. return true;
  43. }
  44. public void lock() {
  45. this.acquire(1);
  46. }
  47. public boolean tryLock() {
  48. return this.tryAcquire(1);
  49. }
  50. public void unlock() {
  51. this.release(1);
  52. }
  53. public boolean isLocked() {
  54. return this.isHeldExclusively();
  55. }
  56. // interrupt已启动线程
  57. void interruptIfStarted() {
  58. Thread t;
  59. if (this.getState() >= 0 && (t = this.thread) != null && !t.isInterrupted()) {
  60. try {
  61. t.interrupt();
  62. } catch (SecurityException var3) {
  63. }
  64. }
  65. }
  66. }

run

//004 runWorker
final void runWorker(ThreadPoolExecutor.Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 允许被 interrupt
    w.unlock();
    boolean completedAbruptly = true;
    try {
        // loop 直至 task = null (线程池关闭、超时等)
        // 注意这里的getTask()方法,我们配置的阻塞队列会在这里起作用
        while(task != null || (task = this.getTask()) != null) {
            w.lock();
            // 如果线程池停止,确保线程中断; 如果没有,确保线程不中断。
            // 这需要在第二种情况下进行重新获取ctl,以便在清除中断时处理shutdownNow竞争
            if ((runStateAtLeast(this.ctl.get(), 536870912) || Thread.interrupted() &&
                 runStateAtLeast(this.ctl.get(), 536870912)) && !wt.isInterrupted()) {
                wt.interrupt();
            }
            try {
                // 前置切点
                this.beforeExecute(wt, task);

                try {
                    task.run();
                    // 后置切点
                    this.afterExecute(task, (Throwable)null);
                } catch (Throwable var14) {
                    this.afterExecute(task, var14);
                    throw var14;
                }
            } finally {
                task = null;
                ++w.completedTasks;
                w.unlock();
            }
        }

        completedAbruptly = false;
    } finally {
        // 线程退出工作
        this.processWorkerExit(w, completedAbruptly);
    }

}

getTask

//005 runWorker的主要任务就是一直loop循环
private Runnable getTask() {
    // 上一次 poll() 是否超时
    boolean timedOut = false;
    while(true) {
        int c = this.ctl.get();
        // 是否继续处理任务 可以参见上一篇的状态控制
        if (runStateAtLeast(c, 0) && (runStateAtLeast(c, 536870912) 
                || this.workQueue.isEmpty())) {
            this.decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);
        // 是否允许超时
        boolean timed = this.allowCoreThreadTimeOut || wc > this.corePoolSize;
        if (wc <= this.maximumPoolSize && (!timed || !timedOut) 
                || wc <= 1 && !this.workQueue.isEmpty()) {
            try {
                Runnable r = timed ? 
                 (Runnable)this.workQueue.poll(this.keepAliveTime, TimeUnit.NANOSECONDS)
                 :(Runnable)this.workQueue.take();
                if (r != null) {
                    return r;
                }

                timedOut = true;
            } catch (InterruptedException var6) {
                timedOut = false;
            }
        } else if (this.compareAndDecrementWorkerCount(c)) {
            return null;
        }
    }
}