JDK5之前,工作单元和执行机制是混合的,JDK以后,将两者分开:

  • 工作单元:Runable、Callable定义
  • 执行机制:Executor框架负责

    Executor框架的两极调度模型

    用户程序通过Executor框架将任务映射为固定线程
    操作系统将用户线程映射为内核线程,CPU处理

Excutor框架的结构概览

image.png

三大组成部分

  1. 任务:Runable、Callable
  2. 任务调度执行:实现了ExecutorService的两个执行器,ThreadPoolExecutor和ScheduledThreadPoolExecutor
  3. 任务执行结果:Future接口,主要是实现Future接口的FutureTask

    整体类图

    image.png

    使用流程图

    image.png

    任务执行器

  4. ThreadPoolExecutor

    1. FixedThreadPool
    2. SingleThreadExecutor
    3. SingleThreadExecutor
  5. ScheduledThreadPoolExecutor

    1. ScheduledThreadPoolExecutor
    2. ScheduledThreadPoolExecutor

      任务

  6. Callable

  7. Runable

    任务结果

    Future接口

FixedThreadPool详解

image.png

  • 只有核心线程
  • keepAliveTime是回收多于核心线程的线程数
  • LinkedBlockingQueue是无界的

    SinleThreadExecutor详解

    image.png

  • 只有一个线程在玩

    CachedThreadPool详解

    image.png

    • 没有核心线程
    • 来任务就创线程

image.png

  1. 提交一个任务,通过offer到SyncQueue中
  2. 若有空闲的线程poll到了,就直接分配给这个线程
  3. 若没有线程poll,则立即新建线程,进行poll操作

image.png

ScheduledThreadPoolExecutor详解

  • 作用:
    • 延迟时间执行任务
    • 定期执行任务

      运行机制

      主要是DelayQueue队列(无界)。所以最大线程池数无意义。
  1. 调用scheduleAtFixedRate()方法或者scheduleWith-FixedDelay()方法,会先延迟执行器中的DelayQueue中添加一个实现了RunnableScheduledFutur接口的ScheduledFutureTask。
  2. 执行图

image.png
3.执行周期的实现
后面再说

ScheduledThreadPoolExecutor的实现

  • 队列中元素:ScheduledFutureTask

ScheduledFutureTask

  • long time:任务执行的具体时间
  • long sequenceNumber:被加入队列的序列号
  • long period:任务执行间隔周期
    1. PriorityQueue会把time小的排在前面,率先执行
    2. 若time相同,则比较sequenceNumber

      周期任务执行步骤

      image.png

      其实不用看,一定是先从队列中拿出来,再塞回去的一个过程

  1. DelayQueue.take()获取一个到期任务
  2. 线程执行任务,并重设time=time+周期时间
  3. DelayQueue.add(),再重加回队列

    DelayQueue中take的任务过程

    image.png

  4. 获取Lock锁

  5. 获取周期任务
    1. 若PriorityQueue空时,Condition等待
    2. 若有元素,且队头元素时间大于当前时间,await到那个time
    3. 可以获取到元素后,若队列不为空,唤醒所有线程
  6. 释放锁

    DelayQueue队列的add流程

image.png

  1. 获取Lock
  2. 添加任务
    1. 向PriorityQueue添加任务
    2. 若是头元素,唤醒所有等待的线程
  3. 释放Lock

FutureTask详解

FutureTask除了实现Future接口,还实现了Runnable接口。因此FutureTask交给Executor后,调用线程直接执行 FutureTask.run()

FutureTask的3种状态

  • 未启动:FutureTask.run()没有被执行之前
  • 已启动:调用了run但是没有运行完毕
  • 已完成

    • run完成
    • cancel
    • 异常

      状态迁移图

      -image.png
  • get方法

    • 未启动或已启动:会阻塞线程
    • 已完成:抛出异常或返回结果
  • cancel方法
    • cancel(true)
      • 未启动:任务将永远不被执行
      • 已启动:已中断方式试图停止
    • cancel(false)
      • 未启动:同上
      • 已启动:正常运行完毕
    • 已完成的任务:cancel方法统一返回false

image.png

FutureTask的使用

  • 产生:ExecutorService.submit()
  • 可执行的操作
    • get
    • cancel

FutureTask的实现

基于AQS
image.png

get的执行流程

  1. 调用AQS.acquireSharedInterruptibly(int arg)
    1. 成功,直接放回结果
    2. 失败,进入等待队列,进入第2步
  2. 其他线程release后,唤醒线程,当前线程获取后,离开等待队列,并唤醒后继线程
  3. 最后返回结果或抛异常

所有调用get方法的线程会进入等待队列。 任务完成后,会先唤醒队头队列,之后依次传播唤醒

run方法流程

  1. 执行Callable.call
  2. 原子更新状态,成功,则设置call()放回的结果,再AQS.release
  3. 调用done()方法