Presto是一个开源的分布式SQL查询引擎,适用于对GB到PB量级的数据源进行交互式查询。Presto的服务器可以分为两种类型:coordinator和worker。Coordinator负责解析查询语句,制定执行计划,和管理worker节点。而worker负责具体任务的执行和数据处理。
本文首先简要介绍了Presto的服务器的启动流程,然后以一个查询请求的处理过程为线索,分析了coordinator对查询的处理流程。整个分析以323版本的PrestoSQL代码为基础.

1. 服务端启动

1.1 相关类

Presto的服务器端启动流程涉及到的类主要有3个:

  • PrestoServer:服务端入口
  • ServerMainModule:服务端的主要依赖注入配置
  • CoordinatorModule:Coordinator的依赖注入配置

    1.2 流程

    PrestoServer作为服务端的运行入口,在其run函数中,进行一系列的依赖注入配置,其中最主要的配置在ServerMainModule中。ServerMainModule针对服务端是否被配置为Coordinator/Worker,分别使用CoordinatorModule和WorkerModule进行配置。

    2. Coordinator

    Coordinator负责接受用户提交的查询请求,对这些请求进行分析,并调度到不同的worker节点上执行。

    2.1 用户执行查询相关Restful接口

    用户执行查询相关的Restful接口,在类QueuedStatementResource和ExecutingStatementResource中实现。
    从用户的角度看来,一个数据库查询在结束前主要经历两个状态,Queued(已提交,排队中)和Executing(执行中)。对于处于不同状态的查询,Presto把相关的Restful接口放在两个类中:
    QueuedStatementResource类负责处理处于Queued状态的查询,主要接口有:

  • postStatement(POST /v1/statement):提交一个新的查询,若成功,会返回新查询的id,nextUri,和新查询的状态等。其中,nextUri可以直接用于下面的getStatus和cancelQuery接口;

  • getStatus(GET /v1/statement/queued/{queryId}/{slug}/{token}):获取查询当前的状态,返回的响应中包含一个nextUri。根据查询的当前状态,这个nextUri的路径以/v1/statement/queued或者/v1/statement/executing为前缀。当以/v1/statement/queued为前缀时,可以用于重复调用getStatus接口或cancelQuery接口。当以/v1/statement/executing为前缀时,可以用于调用ExecutingStatementResource的相关接口;
  • cancelQuery(DELETE /v1/statement/queued/{queryId}/{slug}/{token}):取消已经提交的查询;

ExecutingStatementResource类负责处理处于Executing状态的查询,主要接口有:

  • getQueryResults(GET /v1/statement/executing/{queryId}/{slug}/{token}):获取查询结果,响应中,主要的元素有nextUri(用于进一步调用getQueryResults接口或cancelQuery接口),partialCancelUri(用于取消已经部分执行的查询,即下面的cancelPartial接口),columns(结果的列信息),data(查询结果数据);
  • cancelQuery(DELETE /v1/statement/executing/{queryId}/{slug}/{token}):用于取消已经处于Executing的查询;
  • partialCancel(DELETE /v1/statement/partialCancel/{queryId}/{stage}/{slug}/{token}):用于取消已经部分执行的查询;

    2.2 查询的总体执行流程

    基于在2.1节中介绍的Restful接口,一个查询的总体执行流程如图 1所示,分为以下几个步骤:
    01.png
    图 1 查询的总体执行流程
  1. 用户通过postStatement接口提交一个查询请求,QueuedStatementResource接受到该请求后,创建一个Query对象维护该请求的状态,Query对象的创建过程中,会请求DispatchManager分配一个全局唯一的queryId;
  2. Query创建后,QueuedStatementResource立即请求它的getQueryResults方法,此时由于DispatchManager仅仅只是为该查询分配了ID,而尚未进行分发,因此会立即返回一个nextUri(queuedUri),这个nextUri指向的其实就是QueuedStatementResource的getStatus接口;
  3. 用户通过上一步的nextUri调用getStatus接口之后,实际上调用的是Query对象的waitForDispatched接口,该接口则请求DispatchManager新建一个查询,并等待查询被分发,一旦分发完成,则返回一个nextUri(executingUri),这个nextUri指向的其实就是ExecutingStatementResource的getQueryResults接口;
  4. 一旦getQueryResults接口被调用,ExecutingStatementResource将创建一个protocol.Query对象以维护查询状态,并异步调用该对象的waitForResults以获取查询结果,当查询结果未就绪或者未全部返回,getQueryResults仍然会返回一个nextUri(executingUri),用户可以通过这个nextUri循环获取所有结果数据。

    2.3 分发查询

    从2.2节我们可以看到,一个查询最后是在DispatchManager的主导下进行分发。展示了DispatchManager进行分发的具体步骤。
    02.png
    图 2 查询的分发过程
    从图 2可以看到,查询的分发过程如下:

  5. 首先,DispatchManager请求QuerySessionSupplier,为查询创建一个session;

  6. 请求QueryPreparer,进而请求SqlParser,对查询语句进行语法分析,得到一个已解析的查询语句preparedQuery;
  7. 请求InternalResourceGroupManager,为查询分配一个资源组InternalResourceGroup,用于执行查询;
  8. 进行必要的事务相关处理;
  9. 当上述步骤完成之后,DispatchManager将前面几步产生的session,preparedQuery,resourceGroup(实际上是ResrouceGroup的ID)等,通通放入一个LocalDispatchQuery对象中;
  10. 将LocalDispatchQuery对象提交给InternalResourceGroupManager执行;

需要说明的是,目前Presto只有一个InternalResourceGroup,其ID为GLOBAL,所以所有查询都会使用该资源组。

2.4 查询的执行

从2.3节可以看到,封装了查询的session,preparedQuery,resourceGroup的LocalDispatchQuery,最后是被submit到了InternalResourceGroupManager。接下来,可以算是查询真正开始被执行了。图 3展示了这个过程。
03.png
图 3 查询的执行
图 3的查询执行过程大致可以分为3个阶段:
第一阶段:LocalDispatchQuery创建时,其实也同时创建了另外两个对象,一个是查询的状态机QueryStateMachine,另一个则是SqlQueryExecution。从名字很容易猜到,QueryStateMachine记录当前查询的状态。而SqlQueryExecution则是封装了与Sql执行相关的对象,包括SqlQueryExecution创建时被同时创建的Analyzer,以及下面第二阶段创建的LogicalPlanner,DistributedExecutionPlanner和SqlQueryScheduler。
第二阶段:LocalDispatchQuery被提交到InternalResourceGroupManager之后,后者实际上是调用InternalResourceGroup的run方法,让LocalDispatchQuery在资源组上面执行。这个阶段,会通过ClusterSizeMonitor等待足够数量的worker,然后依次创建LogicalPlanner,DistributedExecutionPlanner和SqlQueryScheduler。依靠这3个类,以及PlanFragmenter,Presto生成查询的执行计划,并根据计划将查询分发到不同的Worker节点上执行。
第三阶段:SqlQueryScheduler在查询完成之后,通知QueryStateMachine,并进一步通知InternalResourceGroup,查询已经完成。
从上面的描述,可以看到,整个过程的关键是第二阶段。第二阶段的LogicalPlanner,PlanFragmenter,DistributedExecutionPlanner和SqlQueryScheduler这几个类,完成了查询计划的生成,到查询的执行这一系列过程,下面是这几个类的简要介绍:

  • LogicalPlanner:负责生成逻辑执行计划;
  • PlanFragmenter:将LogicalPlanner生成的逻辑执行计划,拆分为多个子计划;
  • DistributedExecutionPlanner:将PlanFragmenter拆分好的子计划,进一步拆分成可以分配到不同Worker节点上运行的Stage;
  • SqlQueryScheduler:将Stage调度到不同的Worker节点上运行;

    2.5 查询执行过程的进一步分析

    2.1到2.4节主要是以用户向Presto提交查询语句作为一个分析的入口,分析了查询如何被Presto处理的大体流程。从前面的分析,我们可以知道,一个SQL查询,首先被2.3节的SqlParser解析器处理,接下来依次经过LogicalPlanner,PlanFragmenter,DistributedExecutionPlanner,生成一个分布式的执行计划,最后被SqlQueryScheduler调度到不同的节点上执行。本节将依照这个处理流程,分析其每个阶段的输入和输出。
    为了分析这个流程,本节将结合一个具体的例子,假定我们的Presto连接到了一个MySQL数据库(catalog的名字为test),MySQL数据库上面有一个Database,其名字为db,db中有一个名为tab的表。展示了db和tab的创建脚本。
    04.png
    图 4测试数据库的初始化脚本
    然后我们向Presto提交一个查询:
    1. select col1 from test.db.tab

    2.5.1 SQL的解析

    Presto使用了ANTLR4(一款开源的语法分析器生成工具)生成了Presto的SQL解析器SqlBaseParser(相关的语法定义在SqlBase.g4文件里面)。2.3节中提到的SqlParser,首先调用了SqlBaseParser对我们提交的查询语句进行解析,生成ANTLR4形式的AST(抽象语法树),然后再使用AstVisitor类,从树根开始,遍历这个AST,生成一个Presto用自己的类表达的语法树。图 5是从SqlBase.g4文件中,抽出的跟我们的示例查询相关的语法定义。而图 6是我们例子中的查询语句生成的对应的语法树。
    从图 5和图 6可以看到,图 6这棵语法树跟图 5的语法定义,基本上是对应的。这棵树的根节点是一个Query类的对象,对应的是语法定义中的query定义。它有一个成员body,指向一个QuerySpecification对象,对应语法定义中的querySpecification。而QuerySpecification有一个select成员,指向一个Select类的对象,Select类中有selectItems成员,对应语法定义中querySpecification里面可能出现的多个selectItem,以此类推。
    05.png
    图 5示例查询相关的ANTLR4语法定义
    06.png
    图 6示例查询对应的语法树

    2.5.2 逻辑执行计划的生成

    一旦图 6的语法树已经生成,LogicalPlanner将会据此生成逻辑执行计划。这个阶段分为两步执行,首先,LogicalPlanner对图 6的语法树进行从根节点开始的递归遍历,生成一个未经优化的逻辑计划,如图 7所示。图 7中的表遍历节点TableScanNode是在遍历到图 6的Table节点时生成的,4个映射节点ProjectNode,靠近TableScanNode的两个是在遍历QuerySpecification节点时生成的,另外两个是在遍历Query节点时生成的。最后的OutputNode,是遍历完语法树之后,再生成的输出节点。
    07.png
    图 7 示例查询对应的逻辑执行计划
    可以看到,未经优化的逻辑计划,其实包含非常多冗余的ProjectNode,这时候,LogicalPlanner会进行第二步:对计划进行一系列的优化。在LogicalPlanner类中,有一个planOptimizers列表,其中的每一个元素是一个优化器接口PlanOptimizer的实现。每个PlanOptimizer的实现都带有一个重写器Rewriter,用于对逻辑计划进行递归遍历,重写出新的,优化后的逻辑计划。LogicalPlanner循环地对上一步生成的逻辑计划应用planOptimizers列表的每一个优化器,最终得到图 8所示的优化过的执行计划。
    08.png
    图 8 示例查询对应的优化后的逻辑执行计划
    对于我们的示例查询在第一步生成的逻辑执行计划,真正生效的优化器只有两个:一个是IterativeOptimizer,另外一个是AddExchanges。IterativeOptimizer将逻辑计划中冗余的ProjectNode全部去掉了,这是IterativeOptimizer对RemoveRedundantIdentityProjections规则的应用。而AddExchanges优化器在OutputNode和TableScanNode之间,加上了一个ExchangeNode,用于在不同节点之间交换查询数据。
    去掉冗余的ProjectNode的好处是显而易见的:去掉多余的ProjectNode可以提高查询的执行效率。而之所以需要增加ExchangeNode,是因为我们的最终输出OutputNode需要在Coordinator上执行,而TableScanNode则一般需要调度到Worker上执行,所以两者之间,需要加上一个ExchangeNode以交换数据。

    2.5.3 执行计划的拆分

    Presto接下来会通过PlanFragmenter对优化后的逻辑执行计划进行拆分,分为若干个子计划SubPlan。并不例外,这也是对优化后的逻辑执行计划进行自顶向下的再一次递归遍历完成的。

09.png

图 9 拆分后的子计划
图 9是拆分好的子计划,可以看到,图 8中的逻辑执行计划被拆分为两个子计划。对于图 8的逻辑计划,Presto的拆分的逻辑是,将ExchangeNode转换为RemoteSourceNode,然后为ExchangeNode的sources中的每个元素,新建一个子计划SubPlan。在2.5.4节中我们将会看到,这么拆分可以使DistributedExecutionPlanner将ExchangeNode的sources对应的每一个SubPlan转换为一个Stage,然后分发到不同的Worker上执行。

2.5.4 分布式执行计划的生成

接下来,DistributedExecutionPlanner将上一小节拆分好的子计划,转换为分布式执行计划。DistributedExecutionPlanner的转换逻辑是:将每一个SubPlan转换为一个StageExecutionPlan。所以从图 10可以看到,分布式执行计划与图 9的拆分后的子计划是非常相似的。区别在于,对于那些fragment里面存在TableScanNode的StageExecutionPlan,它会额外维护一个splitSources。SplitSource定义了一个表如何被划分成若干个Batch,以便于后续并行处理。
10.png
图 10 分布式执行计划

2.5.5 执行计划的调度

接下来进入的是执行计划的实际调度阶段,流程如图 11所示。
11.png
图 11 执行计划的调度
SqlQueryScheduler在创建的时候,会为图 10的分布式执行计划中的每一个StateExecutionPlan创建一个对应的SqlStageExecution对象和对应的StageScheduler(为了保持简洁,图 11仅展示了一个SqlStageExecution和一个StageScheduler,但实际上,对应我们的示例查询,SqlStageExecution和StageScheduler应该各有两个,分别对应图 10的两个StateExecutionPlan。并且SqlQueryScheduler创建的是StageScheduler子类的实例,分别是FixedCountScheduler和SourcePartitionedScheduler)。
此后,SqlQueryScheduler通过AllAtOnceExecutionPolicy,创建AllAtOnceExecutionSchedule。AllAtOnceExecutionSchedule在SqlQueryScheduler调用其getStagesToSchedule时,会一次性返回全部未调度的SqlStageExecution的集合。SqlQueryScheduler接下来会遍历这个集合,并调用集合中每个SqlStageExecution对应的StageScheduler的schedule方法,这个方法最终会调用到SqlStageExecution的scheduleTask。 scheduleTask将会创建HttpRemoteTask,并通过HttpRemoteTask,以Restful的方式,将Stage发送到worker节点。此后的执行,将会在worker上处理。

3. 总结

在Presto中,coordinator负责接受查询请求,解析请求,生成执行计划并将计划拆分和调度到worker上执行。本文结合代码,分析了这个流程。