站酷 | 圣诞
Shuffle就是对数据进行重组,由于分布式计算的特性和要求,在实现细节上更加繁琐和复杂。
1
本文的主题是讲一下Spark 的shuffle 寻址过程,在这之前我们先来回忆一下MapReduce 的 shuffle 过程。
在MapReduce框架,Shuffle是连接Map和Reduce之间的桥梁,Map阶段通过shuffle读取数据并输出到对应的Reduce;而Reduce阶段负责从Map端拉取数据并进行计算。在整个shuffle过程中,往往伴随着大量的磁盘和网络I/O。所以shuffle性能的高低也直接决定了整个程序的性能高低。
下面是 MapReduce 的 shuffle 过程,数据被拉取到不同的节点上进行聚合处理,会产生大量的磁盘和网络IO。
Spark 也有自己的 shuffle 过程。下图是 DAG schedula的任务划分,从最后一个RDD往前追溯,遇到宽依赖(shuffle)就划分一个 Stage。
在DAG 调度的过程中,在划分 Stage 的时候,构建 shuffleDependency 的时候进行shuffle注册,获取后续数据读取所有要的 shuffleHandle,最终每一个 job 提交之后都会生成一个 ResultStage 和若干个 ShuffleMapStage ,其中 ResultStage 表示生成作业的最终结果所在的 Stage, ResultStage 与 shuffleMapStage 中的 task 分别对应着 ResultTask 与shuffleMapTask ,一个作业,除了最终的 ResultStage 外,其他若干 shuffleMapStage 中的各个 shuffleMapTask 都需要将最终的数据根据相应的 Partition 对数据进行分组,然后持久化分区的数据。
2
回到本文的主题,shuffle 文件寻址,先来明确几个概念:
MapOutPutTracker:MapOutPutTracker 是 spark 里面的一个模块,主从架构,用来管理磁盘小文件的地址。
MapOutPutTrackerMaster 是主,存在于 Driver 中;
MapOutPutTrackerWorker 是从,存在于 Executor 中;
BlockManager:块管理者,也是一个 spark 中的一个模块,主从架构。
BlockManagerMaster 是主,存在于 Driver 中。用于在集群中传递广播变量或缓存数据或删除数据的时候通知其他的 跟随节点来进行相应的操作。说白了就是指挥。
BlockManagerWorker是从,存在于 Executor 中。会与 BlockManagerMaster节点进行通信。
无论在 Driver 端的 BlockManager 还是在 Excutor 端的BlockManager 都含有四个对象:
① DiskStore:负责磁盘的管理。
② MemoryStore:负责内存的管理。
③ConnectionManager负责连接其他BlockManagerWorker。
④ BlockTransferService:负责数据的传输。
3
shuffle 寻址流程:
下面先来看一张图片,大家可以试着跟着图片来理解 shuffle 的文件寻址。
1.map task运行完毕之后,会将 task 执行之后的产生的磁盘小文件的地址封装到 MapStatus 对象中。通过 MapOutpuTrackerWorker对象向 Driver 中的 MapOutputTrackerMaster 汇报。
2.在所有的 map task 执行完毕后,Driver 中就掌握了所有的磁盘小文件的地址。
3.在 reduce task 执行之前,会通过Executor 中MapOutPutTrackerWorker 的 Driver 端的 MapOutputTrackerMaster 获取磁盘小文件的地址。
**
4.获取到磁盘小文件的地址后,会通过 BlockManager 中的 ConnectionManager 连接数据所在节点上的 ConnectionManager, 然后通过 BlockTransferService 进行数据的传输。
- BlockTransferService 默认启动 5 个 task 去节点拉取数据。默认情况下,5 个 task 拉取数据量不能超过 48 M。拉取过来的数据放在 Executor端的shuffle聚合内存中(spark.shuffle.memeoryFraction 0.2), 如果5 个 task 一次拉取的数据放不到shuffle内存中会有 OOM,如果放下一次,不会有 OOM,以后放不下的会放磁盘。
如何避免OOM
1、拉取数据 少一些。
2、提高ExecutorShuffle聚合内存。
3、提高executor内存。
这里就涉及到 Spark 的内存管理了,这里只是简单的提一下。Spark在Spark1.6之前使用的是静态内存管理,spark1.6之后使用的是统一内存管理,spark.memory.useLegacyMode false(统一内存管理),与静态内存管理的区别在于Storage和Execution共享同一块内存空间,可以动态占用对方的空闲区域 。
为了保持本文的格式,我保存成图片放上来了。微信排版对于技术文章不是很友好。
关于 shuffle 还有很多需要注意的地方,我将在之后的文章介绍一下 shuffle 的调优。
如果对您有帮助,欢迎点 hao kan、转发。