文章首发于:大数据进击之路
by cuteximi
Spark 数据本地化级别 - 图1
RDD 源码

大家可以看到源码中的第五条注释说明,翻译过来的大概意思是提供一系列的最佳计算位置。

我之前一直不太清楚 spark 是如何内部实现的,今天就带领大家来看一看 spark 的本地数据化级别在任务执行中的演变过程。

1 数据的本地化级别有哪些?

Spark 中任务的处理需要考虑数据的本地性,以 spark 1.6 为例,目前支持一下几种。(中英文排版很头疼,谁来帮帮我啊)

PROCESS_LOCAL 进程本地化,表示 task 要计算的数据在同一个 Executor 中。

NODE_LOCAL 节点本地化,速度稍慢,因为数据需要在不同的进程之间传递或从文件中读取。分为两种情况,第一种:task 要计算的数据是在同一个 worker 的不同 Executor 进程中。第二种:task 要计算的数据是在同一个 worker 的磁盘上,或在 HDFS 上恰好有 block 在同一个节点上。如果 Spark 要计算的数据来源于 HDFSD 上,那么最好的本地化级别就是 NODE_LOCAL。

NO_PREF 没有最佳位置,数据从哪访问都一样快,不需要位置优先。比如 Spark SQL 从 Mysql 中读取数据。

RACK_LOCAL 机架本地化,数据在同一机架的不同节点上。需要通过网络传输数据以及文件 IO,比 NODE_LOCAL 慢。情况一:task 计算的数据在 worker2 的 EXecutor 中。情况二:task 计算的数据在 work2 的磁盘上。

ANY 跨机架,数据在非同一机架的网络上,速度最慢。

如果不是很清楚,我画(造)了一张图放在这以供大家理解。

Spark 数据本地化级别 - 图2

2 Spark 的数据本地化由谁来负责呢?

  1. val rdd1 = sc.textFile("hdfs://tsl...")
  2. rdd1.cache()
  3. rdd1.map.filter.count()

上面这段简单的代码,背后其实做什么很多事情。Driver 的 TaskScheduler 在发送 task 之前,首先应该拿到 rdd1 数据所在的位置,rdd1 封装了这个文件所对应的 block 的位置,DAGScheduler 通过调用 getPrerredLocations() 拿到 partition 所对应的数据的位置,TaskScheduler 根据这些位置来发送相应的 task。

具体的解释:
DAGScheduler 切割Job,划分Stage, 通过调用 submitStage 来提交一个Stage 对应的 tasks,submitStage 会调用 submitMissingTasks, submitMissingTasks 确定每个需要计算的 task 的preferredLocations,通过调用 getPreferrdeLocations() 得到 partition 的优先位置,就是这个 partition 对应的 task 的优先位置,对于要提交到 TaskScheduler 的 TaskSet 中的每一个task,该 task 优先位置与其对应的 partition 对应的优先位置一致。

TaskScheduler 接收到了 TaskSet 后,TaskSchedulerImpl 会为每个 TaskSet 创建一个 TaskSetManager 对象,该对象包含taskSet 所有 tasks,并管理这些 tasks 的执行,其中就包括计算 TaskSetManager 中的 tasks 都有哪些 locality levels,以便在调度和延迟调度 tasks 时发挥作用。

总的来说,Spark 中的数据本地化是由 DAGScheduler 和 TaskScheduler 共同负责的。

3

计算节点与输入数据位置的关系,下面以一个图来展开 spark 是如何让进行调度的。这一个过程会涉及 RDD, DAGScheduler , TaskScheduler。

Spark 数据本地化级别 - 图3
第一步:**PROCESS_LOCAL **
TaskScheduler 根据数据的位置向数据节点发送 task 任务。如果这个任务在 worker1 的 Executor 中等待了 3 秒。(默认的,可以通过spark.locality.wait 来设置),可以通过 SparkConf() 来修改,重试了 5 次之后,还是无法执行,TaskScheduler 就会降低数据本地化的级别,从 PROCESS_LOCAL 降到 NODE_LOCAL。

第二步:**NODE_LOCAL**
TaskScheduler 重新发送 task 到 worker1 中的 Executor2 中执行,如果 task 在worker1 的 Executor2 中等待了 3 秒,重试了 5 次,还是无法执行,TaskScheduler 就会降低数据本地化的级别,从 NODE_LOCAL 降到 RACK_LOCAL。

第三步:**RACK_LOCAL**
TaskScheduler重新发送 task 到 worker2 中的 Executor1 中执行。

第四步:
当 task 分配完成之后,task 会通过所在的 worker 的 Executor 中的 BlockManager 来获取数据。如果 BlockManager 发现自己没有数据,那么它会调用 getRemote() 方法,通过 ConnectionManager 与原 task 所在节点的 BlockManager 中的 ConnectionManager先建立连接,然后通过TransferService(网络传输组件)获取数据,通过网络传输回task所在节点(这时候性能大幅下降,大量的网络IO占用资源),计算后的结果返回给Driver。这一步很像 shuffle 的文件寻址流程,Spark 的 shuffle 文件寻址流程

4
TaskScheduler在发送task的时候,会根据数据所在的节点发送task,这时候的数据本地化的级别是最高的,如果这个task在这个Executor中等待了三秒,重试发射了5次还是依然无法执行,那么TaskScheduler就会认为这个Executor的计算资源满了,TaskScheduler会降低一级数据本地化的级别,重新发送task到其他的Executor中执行,如果还是依然无法执行,那么继续降低数据本地化的级别…
如果想让每一个 task 都能拿到最好的数据本地化级别,那么调优点就是等待时间加长。注意!如果过度调大等待时间,虽然为每一个 task 都拿到了最好的数据本地化级别,但是我们 job 执行的时间也会随之延长。

下面是官方提供的参数说明:

Spark 数据本地化级别 - 图4

可以在代码里面这样设置:

  1. new SparkConf.set("spark.locality.wait","100")

如果对您有帮助,欢迎关注、转发。

Spark 数据本地化级别 - 图5