1. 实现线程的4种方式

  1. 继承Thread类

    1. class DemoThread extends Thread {
    2. @Override
    3. public void run() {
    4. System.out.println("继承Thread启动线程");
    5. }
    6. }
  2. 实现Runnable接口

    1. class DemoRunnable implements Runnable {
    2. @Override
    3. public void run() {
    4. System.out.println("实现Runnable接口启动线程");
    5. }
    6. }
  3. 实现Callable 接口 + FutureTask

这种方式启动线程可以获得线程执行结果,可以处理异常

  1. class DemoCallable implements Callable<Integer>{
  2. @Override
  3. public Integer call() throws Exception {
  4. int sum = 0;
  5. for (int i = 0; i <= 100000; i++) {
  6. sum += i;
  7. }
  8. return sum;
  9. }
  10. }
  11. // 线程启动方式
  12. public static void main(String[] args) {
  13. ThreadDemo td = new ThreadDemo();
  14. //1.执行 Callable 方式,需要 FutureTask 实现类的支持,用于接收运算结果。
  15. FutureTask<Integer> result = new FutureTask<>(td);
  16. new Thread(result).start();
  17. //2.接收线程运算后的结果
  18. try {
  19. //FutureTask 可用于闭锁,在这里等待线程的执行结果,才会执行后面的语句
  20. Integer sum = result.get();
  21. System.out.println(sum);
  22. } catch (InterruptedException | ExecutionException e) {
  23. e.printStackTrace();
  24. }
  25. }
  1. 线程池创建线程

    1. //第一种
    2. Executors.newFixedThreadPool(3)
    3. //第二种(推荐)
    4. new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory)

    通过线程池性能稳定,也可以获取执行结果,并捕获异常。但是在业务复杂的情况下,一个异步调用可能会依赖于另一个异步调用的执行结果。

    2. 线程池的好处

  2. 降低资源的耗尽

通过重复利用已经创建好的线程降低线程池的创建和销毁带来的损耗

  1. 提高响应速度

因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程池就能执行

  1. 线程管理

线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配

3. 线程池核心类

Executor 接口 ExecutorService 接口,继承Executor AbstractExecutorService 抽象类,实现ExecutorService 接口 ThreadExecutorPool 具体实现类,实现AbstractExecutorService 抽象类

4. 常用四种线程池

Executors类负责创建线程池

  1. newFixedThreadPool(numberOfThreads:int):(固定线程池)ExecutorService 创建一个固定线程数量的线程池,并行执行的线程数量不变,线程当前任务完成后,可以被重用执行另一个任务
  2. newCachedThreadPool():(可缓存线程池)ExecutorService 创建一个线程池,按需创建新线程,如果线程的当前规模超过了处理需求时,阿么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制
  3. new SingleThreadExecutor();(单线程执行器)线程池中只有一个线程,依次执行任务
  4. new ScheduledThreadPool():线程池按时间计划来执行任务,允许用户设定执行任务的时间,类似于timer

    5. 线程池状态

    线程池有5种状态

  5. RUNNING

能接受新提交的任务,并且也能处理阻塞队列中的任务

  1. SHUTDOWN

关闭状态,可以接受新提交的任务,并且可以处理已经在队列中的任务

  1. STOP

不接受任务,也不会处理队列中的任务,会终止正在处理的任务

  1. TIDYING

所有任务都已经终止,workerCount (工作线程变量)的值为0

  1. TERMINATED

在terminated()执行完后进入该状态

线程池状态图 线程池 - 图1

6. 线程池执行流程

任务调度是线程池的主要入口,当用户提交了一个任务,接下来这个任务将如何执行都是由这个阶段决定的。了解这部分就相当于了解了线程池的核心运行机制。

首先,所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。
执行过程如下

  1. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
  2. 如果workerCount(工作线程) < corePoolSize(核心线程),则创建并启动一个线程来执行新提交的任务。
  3. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,线程池处于运行状态,则将任务添加到该阻塞队列中
  4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务
  5. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

执行流程图
分布式事务-第 18 页.jpg

7. 阻塞队列

  1. ArrayBlockingQueue

是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。

  1. LinkedBlockingQueue

一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列

  1. PriorityBlockingQueue

一个支持线程优先级排序的无界队列,默认自然进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序

  1. DelayQueue

一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时器满后才能从队列中获取元素

  1. SynchronousQueue

一个不存储 元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后就会被回收

  1. LinkedTransferQuere

一个由链表结构组成的无界阻塞队列,相当于其他队列,LinkedTransferQueue队列多了transfer和tryTransfer方法

  1. LinkedBlockingDeque

一个由链表结构组成的双向阻塞队列,队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半

8. 拒绝策略

当线程池接收的任务超过线程池最大数量时(超过maximunPoolSize),就会触发拒绝策略。
接收四种拒绝策略

  1. CallerRunsPolicy

    线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

  1. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  2. if (!e.isShutdown()) {
  3. r.run();
  4. }
  5. }

这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。(开始我总不想丢弃任务的执行,但是对某些应用场景来讲,很有可能造成当前线程也被阻塞。如果所有线程都是不能执行的,很可能导致程序没法继续跑了。需要视业务情景而定吧。)

  1. AbortPolicy

    处理程序遭到拒绝将抛出运行时 RejectedExecutionException

  1. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  2. throw new RejectedExecutionException();
  3. }

这种策略直接抛出异常,丢弃任务。(jdk默认策略,队列满并线程满时直接拒绝添加新任务,并抛出异常,所以说有时候放弃也是一种勇气,为了保证后续任务的正常进行,丢弃一些也是可以接受的任务,记得做好记录)

  1. DiscardPolicy

    不能执行的任务将被删除

  1. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}

这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。

  1. DiscardOldestPolicy

    如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)

  1. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  2. if (!e.isShutdown()) {
  3. e.getQueue().poll();
  4. e.execute(r);
  5. }
  6. }

该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心

9. 线程池源码分析

ThreadPoolExecutor 线程池的创建和执行,了解线程池是如何来启动我们的工作线程的。线程池的执行线程大致分为提交线程、添加执行任务、执行线程三个步骤。

  1. @Test
  2. public void test02() throws InterruptedException {
  3. ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
  4. executor.execute(new Runnable() {
  5. @Override
  6. public void run() {
  7. System.out.println("执行run方法 execute");
  8. }
  9. });
  10. }

9.1. 提交线程

通过execute()方法提交工作线程

  1. public void execute(Runnable command) {
  2. // 没有提交线程抛出异常
  3. if (command == null)
  4. throw new NullPointerException();
  5. int c = ctl.get();
  6. // workerCountOf(c) 正在运行的线程数量
  7. // 正在运行的线程数量小于核心线程数
  8. if (workerCountOf(c) < corePoolSize) {
  9. // 把线程添加到执行任务中
  10. if (addWorker(command, true))
  11. return;
  12. c = ctl.get();
  13. }
  14. // 代码执行到这里,说明运行线程数量已经大于核心线程数,就需要尝试往队列中添加线程
  15. // 线程池在运行中,就往队列中添加线程
  16. if (isRunning(c) && workQueue.offer(command)) {
  17. int recheck = ctl.get();
  18. // 重新校验线程是不是在运行中,如果不是在运行中就从队列中删除线程
  19. if (! isRunning(recheck) && remove(command))
  20. // 执行决绝策略
  21. reject(command);
  22. else if (workerCountOf(recheck) == 0)
  23. // 这个方法很有意思,是添加执行线程任务,但是注意第一个参数应该是一个线程,他这里传了一个null
  24. // 这个方法在这里的作用是当传入的线程为null时,就会从线程池队列中拿任务线程来执行
  25. addWorker(null, false);
  26. }
  27. // 代码执行到这里,说明队列中可添加的线程数量也满了
  28. // 此时把希望寄托于最大线程数量
  29. // 把线程添加到执行任务中,如果当前正在执行的线程数量大于最大线程数量,执行拒绝策略
  30. else if (!addWorker(command, false))
  31. // 执行拒绝策略
  32. reject(command);
  33. }

9.2. 添加执行任务

添加执行任务其实就是满足线程执行条件后,调用addWorker()方法来执行线程,这个方法有两个参数,第一个是执行线程,第二个是boolean值,true为占用核心线程数量来执行线程,false表示占用最大线程数量来执行线程

  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. retry:
  3. for (;;) {
  4. int c = ctl.get();
  5. // 线程池现在状态
  6. int rs = runStateOf(c);
  7. // 线程池状态是否为SHUTDOWN、STOP、TIDYING、TERMINATED状态中的一个
  8. // 并且当前状态为 SHUTDOWN、且传入的任务为 null,同时队列不为空
  9. // 那么就返回 false
  10. if (rs >= SHUTDOWN &&
  11. ! (rs == SHUTDOWN &&
  12. firstTask == null &&
  13. ! workQueue.isEmpty()))
  14. return false;
  15. for (;;) {
  16. int wc = workerCountOf(c);
  17. if (wc >= CAPACITY ||
  18. wc >= (core ? corePoolSize : maximumPoolSize))
  19. return false;
  20. // 增加线程启动数量
  21. if (compareAndIncrementWorkerCount(c))
  22. break retry;
  23. c = ctl.get(); // Re-read ctl
  24. if (runStateOf(c) != rs)
  25. continue retry;
  26. // else CAS failed due to workerCount change; retry inner loop
  27. }
  28. }
  29. boolean workerStarted = false;
  30. boolean workerAdded = false;
  31. Worker w = null;
  32. try {
  33. // 线程放到Worker中
  34. w = new Worker(firstTask);
  35. final Thread t = w.thread;
  36. if (t != null) {
  37. final ReentrantLock mainLock = this.mainLock;
  38. mainLock.lock();
  39. try {
  40. // Recheck while holding lock.
  41. // Back out on ThreadFactory failure or if
  42. // shut down before lock acquired.
  43. int rs = runStateOf(ctl.get());
  44. if (rs < SHUTDOWN ||
  45. (rs == SHUTDOWN && firstTask == null)) {
  46. if (t.isAlive()) // precheck that t is startable
  47. throw new IllegalThreadStateException();
  48. workers.add(w);
  49. int s = workers.size();
  50. if (s > largestPoolSize)
  51. largestPoolSize = s;
  52. workerAdded = true;
  53. }
  54. } finally {
  55. mainLock.unlock();
  56. }
  57. if (workerAdded) {
  58. // 启动线程,执行Workder对象的run()方法
  59. // 在Worker对象中在执行目标线程(添加到Worker中的线程)的start()方法
  60. t.start();
  61. workerStarted = true;
  62. }
  63. }
  64. } finally {
  65. if (! workerStarted)
  66. // 线程执行失败时处理
  67. addWorkerFailed(w);
  68. }
  69. return workerStarted;
  70. }

9.3. 执行线程

把线程放到执行任务中后,接下来就是执行线程。Worker类本身是一个Runnable类,在添加执行任务后,会调用Worker对象的start()方法启动执行任务线程,最终调用Worker类中的run()方法

  1. private final class Worker
  2. extends AbstractQueuedSynchronizer
  3. implements Runnable {
  4. // 省略代码
  5. public void run() {
  6. // 通过runWorker()方法执行目标线程
  7. runWorker(this);
  8. }
  9. }

Worker类中的run()方法调用runWorker()方法执行目标线程

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. Runnable task = w.firstTask;
  4. w.firstTask = null;
  5. w.unlock(); // allow interrupts
  6. boolean completedAbruptly = true;
  7. try {
  8. // 如果task !=null 那么执行核心线程或者最大线程
  9. // 否则从队列中拿线程执行
  10. while (task != null || (task = getTask()) != null) {
  11. // AQS独占锁
  12. w.lock();
  13. // 线程如果是STOP状态,终止线程的执行
  14. if ((runStateAtLeast(ctl.get(), STOP) ||
  15. (Thread.interrupted() &&
  16. runStateAtLeast(ctl.get(), STOP))) &&
  17. !wt.isInterrupted())
  18. wt.interrupt();
  19. try {
  20. // 空方法,留给子类实现,用于在启动目标线程前的逻辑
  21. beforeExecute(wt, task);
  22. Throwable thrown = null;
  23. try {
  24. // 执行目标线程
  25. task.run();
  26. } catch (RuntimeException x) {
  27. thrown = x; throw x;
  28. } catch (Error x) {
  29. thrown = x; throw x;
  30. } catch (Throwable x) {
  31. thrown = x; throw new Error(x);
  32. } finally {
  33. // 空方法,留给子类实现,用于在目标线程启动结束后的逻辑处理
  34. afterExecute(task, thrown);
  35. }
  36. } finally {
  37. task = null;
  38. w.completedTasks++;
  39. w.unlock();
  40. }
  41. }
  42. completedAbruptly = false;
  43. } finally {
  44. processWorkerExit(w, completedAbruptly);
  45. }
  46. }

9.4. 获取队列任务

如果Worker中的firstTask 为空,说明不是执行核心线程和最大线程,那么通过getTask()从队列中获取线程执行

  1. private Runnable getTask() {
  2. boolean timedOut = false; // Did the last poll() time out?
  3. for (;;) {
  4. int c = ctl.get();
  5. // 线程池状态
  6. int rs = runStateOf(c);
  7. // 队列为空,返回null
  8. // 线程池状态不是SHUTDOWN
  9. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  10. decrementWorkerCount();
  11. return null;
  12. }
  13. int wc = workerCountOf(c);
  14. // Are workers subject to culling?
  15. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  16. if ((wc > maximumPoolSize || (timed && timedOut))
  17. && (wc > 1 || workQueue.isEmpty())) {
  18. if (compareAndDecrementWorkerCount(c))
  19. return null;
  20. continue;
  21. }
  22. try {
  23. // 从队列中弹出一个线程
  24. // keepAliveTime 从队列中最大获取线程的等待时间
  25. Runnable r = timed ?
  26. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  27. workQueue.take();
  28. if (r != null)
  29. // 返回目标线程
  30. return r;
  31. timedOut = true;
  32. } catch (InterruptedException retry) {
  33. timedOut = false;
  34. }
  35. }
  36. }