6.1.1 整体概念
Apache Spark 是一个开源的通用集群计算系统,它提供了 High-level 编程 API,支持 Scala、Java 和 Python 三种编程语言。Spark 内核使用 Scala 语言编写,通过基于 Scala 的函数式编程特性,在不同的计算层面进行抽象,代码设计非常优秀。
6.1.2 RDD 抽象
RDD(Resilient Distributed Datasets),弹性分布式数据集,它是对分布式数据集的一种内存抽象,通过受限的共享内存方式来提供容错性,同时这种内存模型使得计算比传统的数据流模型要高效。RDD 具有 5 个重要的特性,如下图所示:
上图展示了 2 个 RDD 进行 JOIN 操作,体现了 RDD 所具备的 5 个主要特性,如下所示:
- 一组分区。
- 计算每一个数据分片的函数。
- RDD 上的一组依赖。
- 可选,对于键值对 RDD,有一个 Partitioner(通常是 HashPartitioner)。
- 可选,一组 Preferred location 信息(例如,HDFS 文件的 Block 所在 location 信息)。
6.1.3 计算抽象
在描述 Spark 中的计算抽象,我们首先需要了解如下几个概念:
- Application:用户编写的 Spark 程序,完成一个计算任务的处理。它是由一个 Driver 程序和一组运行于 Spark 集群上的 Executor 组成。
- Job:用户程序中,每次调用 Action 时,逻辑上会生成一个 Job,一个 Job 包含了多个 Stage。
- Stage:Stage 包括 ShuffleMapStage 和 ResultStage 两类,如果用户程序中调用了需进行 Shuffle 计算的 Operator,如 groupByKey 等,就会以 Shuffle 为边界分成 ShuffleMapStage 和 ResultStage。
- TaskSet:基于 Stage 可以直接映射为 TaskSet,一个 TaskSet 封装了一次需要运算的、具有相同处理逻辑的 Task,这些 Task 可以并行计算,粗粒度的调度是以 TaskSet 为单位的。
- Task:Task 是在物理节点上运行的基本单位,Task 包含两类:ShuffleMapTask 和 ResultTask,分别对应于 Stage 中 ShuffleMapStage 和 ResultStage 中的一个执行基本单元。
下面,我们看一下,上面这些基本概念之间的关系,如下图所示:
上图,为了简单,每个 Job 假设都很简单,并且只需要进行一次 Shuffle 处理,所以都对应 2 个 Stage。实际应用中,一个 Job 可能包含若干个 Stage,或者是一个相对复杂的 Stage DAG。在 Standalone 模式下,默认使用的是 FIFO 这种简单的调度策略,在进行调度的过程中,大概流程如下图所示。
从用户提交 Spark 程序,最终生成 TaskSet,而在调度时,通过 TaskSetManager 来管理一个 TaskSet(包含一组可在物理节点上执行的Task),这里面TaskSet 必须要按照顺序执行才能保证计算结果的正确性,因为 TaskSet 之间是有序依赖的(上溯到 ShuffleMapStage 和 ResultStage),只有一个 TaskSet 中的所有 Task 都运行完成后,才能调度下一个 TaskSet 中的 Task 去执行。
先从 Executor 和 SchedulerBackend 说起。Executor 是真正执行任务的进程,本身拥有若干 cpu 和内存,可以执行以线程为单位的计算任务,它是资源管理系统能够给予的最小单位。SchedulerBackend 是 spark 提供的接口,定义了许多与 Executor 事件相关的处理,包括:新的 executor 注册进来的时候记录 executor 的信息,增加全局的资源量(核数),进行一次 makeOffer;executor 更新状态,若任务完成的话,回收 core,进行一次 makeOffer;其他停止 executor、remove executor 等事件。
makeOffer 的目的是在有资源更新的情况下,通过调用 scheduler 的 resourceOffers 方法来触发它对现有的任务进行一次分配,最终 launch 新的 tasks。这里的全局 scheduler 就是 TaskScheduler,实现是 TaskSchedulerImpl,它可以对接各种 SchedulerBackend 的实现,包括 standalone、yarn、mesos。SchedulerBackend 在做 makeOffer 的时候,会把现有的 executor 资源以 WorkerOfffer 列表的方式传给 scheduler,即以 worker 为单位,将 worker 信息及其内的资源交给 scheduler。scheduler 拿到这一些集群的资源后,去遍历已提交的 tasks 并根据 locality 决定如何 launch tasks。
TaskScheduler 里,resourceOffers 方法会将已经提交的 tasks 进行一次优先级排**序,这个排序算法目前是两种:FIFO 或 FAIR。得到这一份待运行的 tasks 后,接下里就是要把 schedulerBackend 交过来的 worker 资源信息合理分配给这些 tasks。为了避免每次都是前几个 worker 被分到 tasks,所以先对 WorkerOffer 列表进行一次随机洗牌。接下来就是遍历 tasks,看 workers 的资源“够不够”,“符不符合”task,ok 的话 task 就被正式 launch** 起来。
资源“够不够”是很好判断的,在 TaskScheduler 里设置了每个 task 启动需要的 cpu 个数,默认是1,所以只需要做核数的大小判断和减 1 操作就可以遍历分配下去。“符不符合”这件事情,取决每个 tasks 的 locality 设置。task 的 locality 有五种,按优先级高低排:PROCESS_LOCAL,NODE_LOCAL,NO_PREF,RACK_LOCAL,ANY。也就是最好在同个进程里,次好是同个 node(即机器)上,再次是同机架,或任意都行。task 有自己的 locality,如果本次资源里没有想要的 locality 资源,怎么办呢?spark 有一个 spark.locality.wait
参数,默认是 3000ms。对于 process,node,rack,默认都使用这个时间作为 locality 资源的等待时间。所以一旦 task 需要 locality,就可能会触发 delay scheduling。
spark 的一个执行流程梳理。这件事情要从 Spark 的 DAG 切割说起。Spark RDD 通过其 transaction 和 action 操作,串起来形成了一个 DAG。action 的调用,触发了 DAG 的提交和整个 job 的执行。触发之后,由 DAGScheduler 这个全局唯一的面向 stage 的 DAG 调度器来切分 DAG,根据是否 shuffle 来切成多个小 DAG,即 stage。凡是 RDD 之间是窄依赖的,都归到一个 stage 里,这里面的每个操作都对应成 MapTask,并行度就是各自 RDD 的 partition 数目。凡是遇到宽依赖的操作,那么就把这一次操作切为一个 stage,这里面的操作对应成 ResultTask,结果 RDD 的 partition 数就是并行度。MapTask 和 ResultTask 分别可以简单理解为传统 MR 的 Map 和 Reduce,切分他们的依据本质上就是 shuffle。所以 shuffle 之前,大量的 map 是可以同 partition 内操作的。每个 stage 对应的是多个 MapTask 或多个 ResultTask,这一个 stage 内的 task 集合成一个 TaskSet 类,由 TaskSetManager 来管理这些 task 的运行状态,locality 处理(比如需要 delay scheduling)。这个TaskSetManager 是 Spark 层面上的,如何管理自己的 tasks,即任务线程,这一层与底下资源管理是剥离的。我们上面提到的 TaskSetManager 的 resourceOffer 方法,是 task 与底下资源的交互,这个资源交互的协调人是 TaskScheduler,也是全局的,TaskScheduler 对接的是不同的 SchedulerBackend 的实现(比如 mesos,yarn,standalone),如此来对接不同的资源管理系统。同时,对资源管理系统来说,他们要负责的是进程,是 worker 上起几个进程,每个进程分配多少资源。所以这两层很清楚,spark 本身计算框架内管理线程级别的 task,每个 stage 都有一个 TaskSet,本身是个小 DAG,可以丢到全局可用的资源池里跑;spark 下半身的双层资源管理部分掌控的是进程级别的 executor,不关心 task 怎么摆放,也不关心 task 运行状态,这是 TaskSetManager 管理的事情,两者的协调者就是 TaskScheduler 及其内的 SchedulerBackend 实现。
SchedulerBackend 的实现,除去 local 模式的不说,分为细粒度和粗粒度两种。粗粒度与细粒度的主要区别,就是粗粒度是进程 long-running 的,计算线程可以调到 executor 上跑,但 executor 的 cpu 和内存更容易浪费。细粒度的话,可以存在复用,可以实现抢占等等更加苛刻但促进资源利用率的事情。这俩概念还是 AMPLab 论文里最先提出来并在 Mesos 里实现的。所以 standalone 模式下,根据 RDD 的 partition 数,以及每个 task 需要的 cpu 数,可以很容易计算每台物理机器的负载量、资源的消耗情况、甚至知道 TaskSet 要分几批才能跑完一个 stage。
6.1.4 集群模式
Spark 集群在设计的时候,并没有在资源管理的设计上对外封闭,而是充分考虑了未来对接一些更强大的资源管理系统,如 YARN、Mesos 等,所以 Spark 架构设计将资源管理单独抽象出一层,通过这种抽象能够构建一种适合企业当前技术栈的插件式资源管理模块,从而为不同的计算场景提供不同的资源分配与调度策略。
上图中,Spark集群Cluster Manager目前支持如下三种模式:
- Standalone 模式:Standalone 模式是 Spark 内部默认实现的一种集群管理模式,这种模式是通过集群中的 Master 来统一管理资源,而与 Master 进行资源请求协商的是 Driver 内部的StandaloneSchedulerBackend(实际上是其内部的 StandaloneAppClient 真正与 Master 通信),后面会详细说明。
- YARN 模式:YARN 模式下,可以将资源的管理统一交给 YARN 集群的 ResourceManager 去管理,选择这种模式,可以更大限度的适应企业内部已有的技术栈,如果企业内部已经在使用 Hadoop 技术构建大数据处理平台。
- Mesos 模式:随着 Apache Mesos 的不断成熟,一些企业已经在尝试使用 Mesos 构建数据中心的操作系统(DCOS),Spark 构建在 Mesos 之上,能够支持细粒度、粗粒度的资源调度策略(Mesos的优势),也可以更好地适应企业内部已有技术栈。
如何能够保证 Spark 非常容易的让第三方资源管理系统轻松地接入进来。
可以看出,Task 调度直接依赖 SchedulerBackend,SchedulerBackend 与实际资源管理模块交互实现资源请求。这里面,CoarseGrainedSchedulerBackend 是 Spark 中与资源调度相关的最重要的抽象,它需要抽象出与 TaskScheduler 通信的逻辑,同时还要能够与各种不同的第三方资源管理系统无缝地交互。实际上 CoarseGrainedSchedulerBackend 内部采用了一种 ResourceOffer 的方式来处理资源请求。
6.1.5 RPC 网络通信抽象
Spark RPC 层是基于优秀的网络通信框架 Netty 设计开发的,但是 Spark 提供了一种很好地抽象方式,将底层的通信细节屏蔽起来,而且也能够基于此来设计满足扩展性,比如,如果有其他不基于 Netty 的网络通信框架的新的 RPC 接入需求,可以很好地扩展而不影响上层的设计。RPC 层设计,如下图类图所示:
任何两个 Endpoint 只能通过消息进行通信,可以实现一个 RpcEndpoint 和一个 RpcEndpointRef。想要与 RpcEndpoint 通信,需要获取到该 RpcEndpoint 对应的 RpcEndpointRef 即可,而且管理 RpcEndpoint 和 RpcEndpointRef 创建及其通信的逻辑,统一在 RpcEnv 对象中管理。
相关概念:
- RpcEndpoint:表示一个个需要通信的个体(如 master,worker,driver),主要根据接收的消息来进行对应的处理。一个 RpcEndpoint 经历的过程依次是:构建->onStart→receive→onStop。其中 onStart 在接收任务消息前调用,receive 和 receiveAndReply 分别用来接收另一个 RpcEndpoint(也可以是本身)send 和 ask 过来的消息。RpcEndpoint 注意三个方法,receive:改方法被子类实现,用于接收其他节点发送的消息;receive-AndReply:该方法被子类实现,用于接收并回复其他节点发送的消息;onStart:该方法被子类实现,该方法在端口启动的时候自动调用。
- RpcEndpointRef:RpcEndpointRef 是对远程 RpcEndpoint 的一个引用。当我们需要向一个具体的 RpcEndpoint 发送消息时,一般我们需要获取到该 RpcEndpoint 的引用,然后通过该应用发送消息。
- RpcAddress:表示远程的 RpcEndpointRef 的地址,Host + Port。
- RpcEnv:RpcEnv 为 RpcEndpoint 提供处理消息的环境。RpcEnv 负责 RpcEndpoint 整个生命周期的管理,包括:注册 endpoint,endpoint 之间消息的路由,以及停止 endpoint。
6.1.6 启动 Standalone 集群
Standalone 模式下,Spark 集群采用简单的 Master-Slave 架构模式,Master 统一管理所有的 Worker,这种模式很常见,我们简单地看下 Spark Standalone 集群启动的基本流程,如下图。
可以看到 Spark 集群采用的消息的模式进行通信,即 EDA 架构模式,借助于 RPC 层的优雅设计,任何两个 Endpoint 想要通信,发送消息并携带数据即可。上图的流程描述如下所示:
- Master 启动时首先创一个 RpcEnv 对象,负责管理所有通信逻辑。
- Master 通过 RpcEnv 对象创建一个 Endpoint,Master 就是一个 Endpoint,Worker 可以与其进行通信。
- Worker 启动时也是创一个 RpcEnv 对象。
- Worker 通过 RpcEnv 对象创建一个 Endpoint。
- Worker 通过 RpcEnv 对象建立到 Master 的连接,获取到一个 RpcEndpointRef 对象,通过该对象可以与 Master 通信。
- Worker 向 Master 注册,注册内容包括主机名、端口、CPU Core 数量、内存数量。
- Master 接收到 Worker 的注册,将注册信息维护在内存中的 Table 中,其中还包含了一个到 Worker 的 RpcEndpointRef 对象引用。
- Master 回复 Worker 已经接收到注册,告知 Worker 已经注册成功。
- 此时如果有用户提交 Spark 程序,Master 需要协调启动 Driver;而 Worker 端收到成功注册响应后,开始周期性向 Master 发送心跳。
6.1.7 核心组件
集群处理计算任务的运行时(即用户提交了 Spark 程序),最核心的顶层组件就是 Driver 和 Executor,它们内部管理很多重要的组件来协同完成计算任务,核心组件栈如下图所示:
Driver 和 Executor 都是运行时创建的组件,一旦用户程序运行结束,他们都会释放资源,等待下一个用户程序提交到集群而进行后续调度。上图,我们列出了大多数组件,其中 SparkEnv 是一个重量级组件,他们内部包含计算过程中需要的主要组件,而且,Driver 和 Executor 共同需要的组件在 SparkEnv 中也包含了很多。这里,我们不做过多详述,后面交互流程等处会说明大部分组件负责的功能。
6.1.8 核心组件交互流程
在 standalone 模式中,没有 AM 和 NM 的概念,也没有 RM 的概念,用户节点直接与 master 打交道,由 driver 负责向 master 申请资源,并由 driver 进行资源的分配和调度等等。首先,为了理解组件之间的主要交互流程,我们给出一些基本要点:
- 一个 Application 会启动一个 Driver。
- 一个 Driver 负责跟踪管理该 Application 运行过程中所有的资源状态和任务状态。
- 一个 Driver 会管理一组 Executor。
- 一个 Executor 只执行属于一个 Driver 的 Task。
核心组件之间的主要交互流程,如下图所示:
上图中,通过不同颜色或类型的线条,给出了如下 6 个核心的交互流程:
- 橙色:提交用户 Spark 程序。用户提交一个 Spark 程序,主要的流程如下所示:
- 用户 spark-submit 脚本提交一个 Spark 程序,会创建一个 ClientEndpoint 对象,该对象负责与 Master 通信交互。
- ClientEndpoint 向 Master 发送一个 RequestSubmitDriver 消息,表示提交用户程序。
- Master 收到 RequestSubmitDriver 消息,向 ClientEndpoint 回复 SubmitDriverResponse,表示用户程序已经完成注册。
- ClientEndpoint向Master 发送 RequestDriverStatus 消息,请求 Driver 状态。
- 如果当前用户程序对应的 Driver 已经启动,则 ClientEndpoint 直接退出,完成提交用户程序。
- 紫色:启动 Driver 进程。当用户提交用户 Spark 程序后,需要启动 Driver 来处理用户程序的计算逻辑,完成计算任务,这时 Master 协调需要启动一个 Driver,具体流程如下所示:
- Maser 内存中维护着用户提交计算的任务 Application,每次内存结构变更都会触发调度,向 Worker 发送 LaunchDriver 请求。
- Worker 收到 LaunchDriver 消息,会启动一个 DriverRunner 线程去执行 LaunchDriver 的任务。
- DriverRunner 线程在 Worker 上启动一个新的 JVM 实例,该 JVM 实例内运行一个 Driver 进程,该 Driver 会创建 SparkContext 对象。
- 红色:注册 Application。Dirver 启动以后,它会创建 SparkContext 对象,初始化计算过程中必需的基本组件,并向 Master 注册 Application,流程描述如下:
- 创建 SparkEnv 对象,创建并管理一些基本组件。
- 创建 TaskScheduler,负责 Task 调度。
- 创建 StandaloneSchedulerBackend,负责与 ClusterManager 进行资源协商。
- 创建 DriverEndpoint,其它组件可以与 Driver 进行通信。
- 在 StandaloneSchedulerBackend 内部创建一个 StandaloneAppClient,负责处理与 Master 通信交互。
- StandaloneAppClient 创建一个 ClientEndpoint,实际负责与 Master 通信。
- ClientEndpoint 向 Master 发送 RegisterApplication 消息,注册 Application。
- Master 收到 RegisterApplication 请求后,回复 ClientEndpoint 一个 RegisteredApplication 消息,表示已经注册成功。
- 蓝色:启动 Executor 进程。
- Master 向 Worker 发送 LaunchExecutor 消息,请求启动 Executor;同时 Master 会向 Driver 发送 ExecutorAdded 消息,表示 Master 已经新增了一个 Executor(此时还未启动)。
- Worker 收到 LaunchExecutor 消息,会启动一个 ExecutorRunner 线程去执行 LaunchExecutor 的任务。
- Worker 向 Master 发送 ExecutorStageChanged 消息,通知 Executor 状态已发生变化。
- Master 向 Driver 发送 ExecutorUpdated 消息,此时 Executor 已经启动。
- 粉色:启动 Task 执行。
- StandaloneSchedulerBackend 启动一个 DriverEndpoint。
- DriverEndpoint 启动后,会周期性地检查 Driver 维护的 Executor 的状态,如果有空闲的 Executor 便会调度任务执行。
- DriverEndpoint 向 TaskScheduler 发送 ResourceOffer 请求。
- 如果有可用资源启动 Task,则 DriverEndpoint 向 Executor 发送 LaunchTask 请求。
- Executor 进程内部的 CoarseGrainedExecutorBackend 调用内部的 Executor 线程的 launchTask 方法启动 Task。
- Executor 线程内部维护一个线程池,创建一个 TaskRunner 线程并提交到线程池执行。
- 绿色:Task 运行完成。
- Executor 进程内部的 Executor 线程通知 CoarseGrainedExecutorBackend,Task 运行完成。
- CoarseGrainedExecutorBackend 向 DriverEndpoint 发送 StatusUpdated 消息,通知 Driver 运行的 Task 状态发生变更。
- StandaloneSchedulerBackend 调用 TaskScheduler 的 updateStatus 方法更新 Task 状态。
- StandaloneSchedulerBackend 继续调用 TaskScheduler 的 resourceOffers 方法,调度其他任务运行。
6.1.9 Block 管理
Block 管理,主要是为 Spark 提供的 Broadcast 机制提供服务支撑的。Spark 中内置采用 TorrentBroadcast 实现,该 Broadcast 变量对应的数据(Task 数据)或数据集(如 RDD),默认会被切分成若干 4M 大小的 Block,Task 运行过程中读取到该 Broadcast 变量,会以 4M 为单位的 Block 为拉取数据的最小单位,最后将所有的 Block 合并成 Broadcast 变量对应的完整数据或数据集。将数据切分成 4M 大小的 Block,Task 从多个 Executor 拉取 Block,可以非常好地均衡网络传输负载,提高整个计算集群的稳定性。
通常,用户程序在编写过程中,会对某个变量进行 Broadcast,该变量称为 Broadcast 变量。在实际物理节点的 Executor 上执行 Task 时,需要读取 Broadcast 变量对应的数据集,那么此时会根据需要拉取 DAG 执行流上游已经生成的数据集。采用 Broadcast 机制,可以有效地降低数据在计算集群环境中传输的开销。具体地,如果一个用户对应的程序中的 Broadcast 变量,对应着一个数据集,它在计算过程中需要拉取对应的数据,如果在同一个物理节点上运行着多个 Task,多个 Task 都需要该数据,有了 Broadcast 机制,只需要拉取一份存储在本地物理机磁盘即可,供多个 Task 计算共享。
另外,用户程序在进行调度过程中,会根据调度策略将 Task 计算逻辑数据(代码)移动到对应的 Worker 节点上,最优情况是对本地数据进行处理,那么代码(序列化格式)也需要在网络上传输,也是通过 Broadcast 机制进行传输,不过这种方式是首先将代码序列化到 Driver 所在 Worker 节点,后续如果 Task 在其他 Worker 中执行,需要读取对应代码的 Broadcast 变量,首先就是从 Driver 上拉取代码数据,接着其他晚一些被调度的 Task 可能直接从其他 Worker 上的 Executor 中拉取代码数据。
我们通过以 Broadcast 变量 taskBinary 为例,说明 Block 是如何管理的,如下图所示:
上图中,Driver 负责管理所有的 Broadcast 变量对应的数据所在的 Executor,即一个 Executor 维护一个 Block 列表。在 Executor 中运行一个 Task 时,执行到对应的 Broadcast 变量 taskBinary,如果本地没有对应的数据,则会向 Driver 请求获取 Broadcast 变量对应的数据,包括一个或多个 Block 所在的 Executor 列表,然后该 Executor 根据 Driver 返回的 Executor 列表,直接通过底层的 BlockTransferService 组件向对应 Executor 请求拉取 Block。Executor 拉取到的 Block 会缓存到本地,同时向 Driver 报告该 Executor 上存在的 Block 信息,以供其他 Executor 执行 Task 时获取 Broadcast 变量对应的数据。
6.1.10 整体应用
用户通过 spark-submit 提交或者运行 spark-shell REPL,集群创建 Driver,Driver 加载 Application,最后 Application 根据用户代码转化为 RDD,RDD 分解为 Tasks,Executor 执行 Task 等,整体交互蓝图如下:
流程:
- Client 运行时向 Master 发送启动驱动申请(发送 RequestSubmitDriver 指令)。
- Master 调度可用 Worker 资源进行驱动安装(发送 LaunchDriver 指令)。
- Worker 运行 DriverRunner 进行驱动加载,并向 Master 发送应用注册请求(发送 RegisterApplication 指令)。
- Master 调度可用 Worker 资源进行应用的 Executor 安装(发送 LaunchExecutor 指令)。
- Executor 安装完毕后向 Driver 注册驱动可用 Executor 资源(发送 RegisterExecutor指令)。
- 最后是运行用户代码时,通过 DAGScheduler,TaskScheduler 封装为可以执行的 TaskSetManager 对象。
- TaskSetManager 对象与 Driver 中的 Executor 资源进行匹配,在队形的 Executor 中发布任务(发送 LaunchTask 指令)。
- TaskRunner 执行完毕后,调用 DriverRunner 提交给 DAGScheduler,循环(第七步)直到任务完成。