一条SQL的使命


  1. 客户端调用CN提供的查询rest接口,CN接收到请求后创建一个对应的Query对象放在队列里并转化为Statement树,同时返回一个QueryId给客户端用于后续获取状态以及获取结果QueuedStatementResource对象
  2. 在客户端初次通过QueryId进行查询时,CN会启动一个线程对这个Query对象进行调度SqlQueryExecution对象
  3. 首先将Query对象对应的Statement转化为AST对象;此阶段做语法校验、语句重写以及权限校验Analysis对象
  4. 执行计划编译器将语法树层层展开,把树转成树状的执行结构,每个节点都是一个操作;此时会优化器会发挥作用,生成效率更高的执行计划树。Plan 对象
  5. 对执行计划树进行分布式处理,使其可以并行的运算处理。SubPlan 对象1~5 是发生在Coordinator进程中的预查询阶段
  6. 遍历逻辑执行计划树获取需要执行的Stage集合SqlQueryScheduler#createStages方法:创建Stages集合列表StageExecutionPlan 对象
  7. 生成逻辑执行计划并开始进行任务调度,此时调度的最小单位是Task;Task的作用是在Worker拉起计算任务同时周期查询此Task的状态SqlQueryScheduler 对象6~7 是发生在Coordinator进程中的调度阶段
  8. Worker提供接口给Coordinator进行Task的状态更新TaskResource对象
  9. Worker有缓存TaskId和SqlTask的映射,在接收到请求时会根据TaskId获取对应的SqlTask来进行请求处理每个Task任务都有唯一对应的SqlTask对象,每个SqlTask对象都有唯一对应的TaskHolder对象TaskHolder对象存储Task的状态以及Task处理逻辑 SqlTaskExecution
  10. 先构造TaskContext 对象,再根据此对象来创建本地执行计划,最终根据本地执行计划构造Task执行器SqlTaskExecutionLocalExecutionPlanner 对象
  11. 在构造SqlTaskExecution对象时,其会以依赖注入方式构造对应的TaskExecutor 对象TaskExecutor对象通过构造函数并以线程池方式添加计算任务,由TaskRunner对象封装计算逻辑每个TaskRunner对象从待处理split队列头获取一个split进行处理;加锁根据DriverContext和算子集合来构造并执行Driver
  12. TaskExecutor对象通过DriverSplitRunner对Driver进行管理,此时执行Driver的逻辑释放算子Operator不用的内存资源,同时根据条件进行磁盘溢写操作;更新Driver的数据源split列表信息TaskRunner线程对象会依次处理Split/Driver队列中的任务直到队列为空,此时SqlTask对象的状态就会变为已完成状态Driver 对象
  13. 将该Task对应的TaskSource列表存到SqlTaskExecution中同时更新此Task对应的Driver的数据源。同时将当前Task状态信息返回给Coordinator
  14. 在更新Task对应的数据源时,开始进行任务调度SqlTaskExecution#schedulePartitionedSource8~14发生在Worker节点

    SqlTask


1、SqlTaskManager

  • 统一管理并监控Worker节点上的Task,并且通过SqlTask管理拆分后的每一个Task计算任务
  • 通过维护一张内存映射表,存储taskId映射对应的SqlTask并定期更新SqlTask的状态
  • SqlTaskExecutionFactory属性用于创建并且管理SqlTaskExecution,SqlTaskExecution是SqlTask的处理核心逻辑
  • 维护一个线程池专门用于调度SqlTask(此线程池的核心线程数跟SqlTask数量一致,确保每个Task都不需要等待)

    2、SqlTaskExecution

  • 每个SqlTask通过内部类TaskHolder绑定唯一的SqlTaskExecution,其封装SqlTask核心处理逻辑

  • 通过TaskExecutor以多线程方式处理SqlTask中的任务,这些线程会根据split来创建对应的driver进行处理
  • 维护一个线程池专门用于调度driver(此线程池的核心线程数跟SqlTask的子Task数量一致,确保每个Task都不需要等待)
  • addSources:首先更新需要添加的TaskSource列表,遍历当前Task里的所有driver并给每个driver更新数据源信息
  • updateSources:移除已经ack的split,对将要增加的split按照规则合并,获得当前计划对应的DriverSplitRunnerFactory
  • schedulePartitionedSource:
  • enqueueDriverSplitRunner:

    3、LocalExecutionPlanner

  • SqlTaskExecutionFactory通过LocalExecutionPlanner对SqlTask做解析、生成执行计划并且构造SqlTaskExecution

    4、Driver

    定义计算逻辑Driver ——> DriverContext ——> pipelineContext#start ——> taskContext#start
    Driver是拉取数据以及计算数据的最小单元createDriver:通过算子列表来构造Driver对象,每个Driver对象的任务就是执行完这些算子,同时生成TaskSource对象用于拉取数据源initialize:给每个算法上下文添加监听释放内存的操作,争取采用内存来做计算processInternal:执行一系列算子(其中有的算子是拉取数据)根据是否是申请资源的请求来决定分配内存/磁盘溢写更新TaskSource中的split如果当前算子还没忙完,并且下一个算子是非阻塞拉取数据,则开始执行下一个算子

    4、ExchangeOperator

    通过 ExchangeClient按Page来进行数据拉取,其在双端队列拉取数据的同时,会触发HttpPageBufferClient对象拉取数据缓存在双端队列HttpPageBufferClient#initiateRequest:向上游异步拉取序列化数据并存在双端队列(先ack再存);如果上游的数据已经拉取完了则异步发送请求给上游让其删除那部分数据向上游拉取数据的触发条件:① 在ExchangeClient在双端队列拉取数据的时触发 ② HttpPageBufferClient向上游拉取数据时,如果拉取数据不为空也会触发

    疑问

    1、SqlTaskManager由谁创建?怎么创建?

    2、TaskExecutor、TaskContext和TaskHandle三者是什么关系?

    TaskHandle和TaskExecutor分别是SqlTaskExecution用于处理任务的逻辑xxx的TaskContext是QueryContext的属性,间接隶属于SqlTask的变量

    参考文档

    https://blog.csdn.net/LW_Moving_Bricks/article/details/88984561https://www.jianshu.com/p/7285c510c01dhttps://blog.lovezhy.cc/2018/07/20/Presto源码-ExchangeClient和OutputBuffer详解/生成数据:https://www.jianshu.com/p/173de219379e博客侧栏:https://www.wandouip.com/t5i228630/Presto大佬的博客:http://armsword.com/2018/08/11/the-basic-concepts-of-presto/

    写在最后

    接触Presto已经有半年之久,虽然也看了不少Presto相关的资料以及也使用了Presto不短的时间,但是总是感觉很熟悉又很陌生;仔细思考,觉得之前对Presto的学习仅仅停留在使用以及查阅一些相关资料,对Presto的了解都是通过别人的眼睛,因此现在打算通过自己的眼睛,一点点的通过调试去梳理底层的逻辑。