分布式计算框架Mapreduce详解
MapReduce是一种可用于数据处理的编程模型。该模型比较简单,但用于编写有用的程序并不简单。Hadoop可以运行由各种语言编写的MapReduce程序。例如:Java、Python和C++语言等。最重要的是,MapReduce程序本质上是并行运行的,因此可以将大规模的数据分析任务交给任何一个拥有足够多机器的运行商。MapReduce的优势在于处理大规模数据集
从MapReduce自身的命名特点可以看出,MapReduce由两个阶段组成:Map和Reduce。用户只需编写map()和reduce()两个函数,即可完成简单的分布式程序设计
- map()函数以key/value对作为输出,产生另外一系列key/value对作为中间输出写入本地磁盘。MapReduce框架会自动将这些中间数据按照key值进行聚合,且key值相同的数据被统一交给reduce()函数处理。
- reduce()函数以key及对应的value列表作为输入,经合并key相同的value值后,产生另外一系列key/value对作为最终输出写入HDFS。
设计目的
- 易于编程
- 良好的扩展性
- 高容错性
Shuffle
1.概述
MapReducer中,map阶段处理的数据如何传递给reducer阶段,是MapReducer框架中最关键的流程,这个流程就叫做shuffle。shuffle就是洗牌的过程,它会将数据进行分组排序。具体来说它就是将 maptask 输出的处理结果数据,分发给 reducetask,并在分发的过程中,对数据按 key 进行了分组和排序
2.主要流程
- map端
- map函数开始产生输出时,利用缓冲的方式写到内存,并出于效率的考虑进行预排序。
- 每个map任务都有一个环形内存缓冲区,用于存储任务的输出,默认情况是100MB,一旦缓冲内容达到阈值(默认是80%),一个后台线程开始把内容写到磁盘中。在写磁盘过程中,map输出继续被写到缓冲区,但如果在此期间缓冲区被填满,map会阻塞直到写磁盘过程完成。
- 在写磁盘之前,线程首先根据数据最终要传送到reducer把数据划分成相应的分区,在每个分区中,后台线程按键进行内排序,如果有一个combiner,它会在排序后的输出上运行
- 一旦内存缓冲区达到溢出写的阈值,就会新建一个溢出写文件,因此在map任务写完其最后一个输出记录之后,会有几个溢出写文件。在任务完成之前,溢出写文件被合并成一个已分区且已排序的输出文件
- reducer端
- reducer通过HTTP方式得到输出文件的分区,这就是reduce任务的复制阶段(copy phase)。reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程
- 与map端一样,reduce端将数据复制过来的时候先被复制到内存中,当内存中的数据量到达一定阈值,就启动内存到磁盘的merge,也就是溢写文件,然后会在磁盘中生成了众多的溢写文件,直到没有map端的数据输出为止,然后进行大合并,将多个溢写文件合并成一个文件,最后,直接把数据输入reduce函数,然后输出
YARN平台
1.产生背景
- 扩展性差:难以支持MR之外的计算
- 资源利用率低:不能将硬件资源利用到最高
可靠性差:NameNode单点故障
2.架构与组件ResourceManager
- ResourceManager 是基于应用程序对集群资源的需求进行调度的 Yarn 集群主控节点,负责协调和管理整个集群(所有 NodeManager)的资源,通过AppMaster响应用户提交的不同类型应用程序的解析,调度,监控等工作。ResourceManager 会为每一个 Application 启动一个 ApplicationMaster, 并且 ApplicationMaster 分散在各个 NodeManager 节点
- NodeManager
- 管理每一个节点的资源,并上报给RM。
- NodeManager 是 YARN 集群当中真正资源的提供者,是真正执行应用程序的容器的提供者, 监控应用程序的资源使用情况,并通过心跳向集群资源调度器 ResourceManager 进行汇报。
- ApplicationMaster
- ApplicationMaster 对应一个应用程序,职责是向资源调度器申请执行任务的资源容器,监控整个任务的执行,跟踪整个任务的状态,处理任务失败以异常情况
- Container
- Container 是一个抽象出来的逻辑资源单位。它封装了一个节点上的 CPU,内存,磁盘,网络等信息,MapReduce 程序的所有 task 都是在这个容器里执行完成的
3.作业运行过程
(JVM是java里的虚拟机)
- 作业提交
- Client端发出请求运行一个Job(第1步),请求资源管理器(ResourceManager),获取一个作业ID(应用ID)(第2步),作业的客户端核实作业的输出, 计算输入的split, 将作业的资源(包括Jar包, 配置文件, split信息)拷贝给HDFS(第3步),最后, 通过调用资源管理器的submitApplication()方法来提交作业(第4步)
- 作业初始化
- 当RM资源管理器收到submitApplication()提交申请的请求时, RM判断哪个NN比较空闲,就将该请求发给调度器(scheduler), 调度器分配第一个container, 并与对应的 NodeManager 通讯,然后资源管理器在该container内启动应用管理器(Application Master)进程, 由节点管理器(NodeManager)监控(第5a和5b步)
- MapReduce作业的application master是一个Java应用程序,它的主类是MRAppMaster。MRAppMaster对作业进行初始化:通过创建多个对象以保持对作业进度的跟踪,因为他将接受来自任务的进度和完成报告(第6步)
- MRAppMaster接受来自共享文件系统的在客户端计算的输入分片(第7步),去HDFS拿取作业的具体要求,比如运行什么程序,在什么环境等。(去HDFS拿取作业的具体要求,比如运行什么程序,在什么环境等)对每一个分片创建map任务对象以及reduce任务对象。
- 任务分配
- 如果不是小作业, 那么MRAppMaster应用管理器向RM资源管理器采用轮询的方式通过 RPC 协议向 ResourceManager 申请和领取资源,MRAppMaster请求第二个container来运行所有的maptask和reducetask任务(第8步),每个任务对应一个container,且只能在该container上运行,这些请求是通过心跳来传输它们的状态。如果当前的NN空闲,就在当前的NN上运行,如果当前的NN比较忙,就去另外的NN上运行。
- 任务执行
- MRAppMaster 申请到任务的资源后,资源管理器的调度器为NN分配container,MRAppMaster与对应的 NodeManager 通讯, MRAppMaster应用管理器通过练习节点(NodeManager)管理器来启动container(第9a步和9b步).
- 任务有一个主类为YarnChild的Java应用执行. 在运行任务之前首先本地化任务需要的资源, 比如作业配置, JAR文件, 以及分布式缓存的所有文件(第10步).
- NodeManager 为任务设置好运行环境(包括环境变量、JAR 包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务。
- 最后, 运行map或reduce任务(第11步).
- 进度和状态更新
- YARN中的任务将其进度和状态(包括counter)返回给AppMaster应用管理器, 后者通过每3秒的脐带接口有整个作业的视图。各个任务通过某个 RPC 协议向 MRAppMaster 汇报自己的状态和进度,以让 MRAppMaster随时掌握各个任务的运行状态,从而可以在任务败的时候重新启动任务。
- 作业完成
- 除了向应用管理器请求作业进度外, 客户端每5分钟都会通过调用waitForCompletion()来检查作业是否完成。应用程序运行完成后,MRAppMaster 向 ResourceManager 注销并关闭自己。