之前大数据组件关系一文中讲到hadoop2.0后,hadoop由HDFS、Yarn、MapReduce这三个组件构成,那么现在来讲讲他们的执行过程。参考了链接。
这里主要讲结构,不讲启动通信之类的。先认识了解结构了,后续阅读源码的时候就知道哪些是重点了,逻辑就清晰了。
HDFS
namenode
- 管理文件系统命名空间和客户端对文件访问
- 保存文件元数据(文件信息、文件拆分block块信息、以及block和DataNode的信息)
-
datanode
保存具体的block数据
- 负责数据的读写操作和复制操作
- 向NameNode报告当前存储或者修改的数据信息
-
block
基本存储单位(一般64M)
- 一个大文件会被拆分成多个block块,然后存储到不同机器上
- 每块会备份到其他机器上,保证数据安全性,防止数据丢失(默认备份3份)。
执行流程
文件写入步骤
- Client向NameNode发起文件写入请求
- NameNode根据文件大小和文件块配置情况,返回给Client部分DataNode的信息
- Client将文件划分多个block块,并根据DataNode地址信息,按照顺序写入每一个DataNode块中。
- 文件读取步骤
Client向NameNode发起文件读取请求。
- NameNode返回文件存储的block块信息,以及block块所在的DataNode的信息
Client根据返回的DataNode和block信息,读取文件信息
数据备份
NameNode负责管理block块的复制,它周期性地接收集群中所有DataNode的心跳数据包和Blockreport。心跳包表示DataNode正常工作,Blockreport描述了该DataNode上所有的block组成的列表。
- HDFS采用一种称为rack-aware的策略来决定备份数据的存放。通过一个称为Rack Awareness的过程,NameNode决定每个DataNode所属rack id。缺省情况下,一个block块会有三个备份,一个在NameNode指定的DataNode上,一个在指定DataNode非同一rack的DataNode上,一个在指定DataNode同一rack的DataNode上。这种策略综合考虑了同一rack失效、以及不同rack之间数据复制性能问题。
- 为了降低整体的带宽消耗和读取延时,HDFS会尽量读取最近的副本。如果在同一个rack上有一个副本,那么就读该副本。如果一个HDFS集群跨越多个数据中心,那么将首先尝试读本地数据中心的副本。
YARN
主要成分:ResourceManager、NodeManager、Application Master、ContainerResourceManager
ResourceManager是master上的进程,负责整个分布式系统的资源管理和调度。他会处理来自client端的请求(包括提交作业/杀死作业);启动/监控Application Master;监控NodeManager的情况,比如可能挂掉的NodeManager。NodeManager
相对应的,NodeManager时处在slave节点上的进程,他只负责当前slave节点的资源管理和调度,以及task的运行。他会定期向ResourceManager回报资源/Container的情况(heartbeat);接受来自ResourceManager对于Container的启停命令。Application Master
每一个提交到集群的作业都会有一个与之对应的Application Master来负责应用程序的管理。他负责进行数据切分;为当前应用程序向ResourceManager去申请资源(也就是Container),并分配给具体的任务;与NodeManager通信,用来启停具体的任务,任务运行在Container中;而任务的监控和容错也是由Application Master来负责的。Container
它包含了Application Master向ResourceManager申请的计算资源,比如说CPU/内存的大小,以及任务运行所需的环境变量和队任务运行情况的描述
执行流程
MapReduce
什么是MapReduce
MapReduce分布式计算的计算方式指定一个Map函数,用来吧一组键值对映射成一组新的键值对,并指定并发的Reduce函数,用来保证所有映射的键值对中的每一个共享相同的键组。
MapReduce是一种分布式计算模型,是Google提出的,主要用于搜索领域,解决海量数据的计算问题。
MR有两个阶段组成:Map和Reduce,用户只需实现map()和reduce()两个函数,即可实现分布式计算。
完整工作流程图
流程详细描述
MapTask流程
Map Task 各阶段描述
- Read 阶段: Map Task 通过用户编写的 RecordReader,从输入 InputSplit 中解析出一个个 key/value。
- Map 阶段:该节点主要是将解析出的 key/value 交给用户编写 map()函数处理,并产生一系列新的 key/value。
- Collect 收集阶段:在用户编写 map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的 key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。
Spill 阶段:即“溢写”,当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
扩展–溢写阶段详情 步骤 1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号partition 进行排序,然后按照 key 进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照 key 有序。
步骤 2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件 output/spillN.out(N 表示当前溢写次数)中。如果用户设置了 Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
步骤 3:将分区数据的元信息写到内存索引数据结构 SpillRecord 中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过 1MB,则将内存索引写到文件 output/spillN.out.index 中。
Combine 阶段
当所有数据处理完成后,MapTask 对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。当所有数据处理完后,MapTask 会将所有临时文件合并成一个大文件,并保存到文件output/file.out 中,同时生成相应的索引文件 output/file.out.index。在进行文件合并过程中,MapTask 以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并 io.sort.factor(默认 100)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。让每个 MapTask 最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过 1MB,则将内存索引写到文件 output/spillN.out.index 中。
Shuffle流程
shuffle阶段描述
- maptask 收集我们的 map()方法输出的 kv 对,放到内存缓冲区中
- 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
- 多个溢出文件会被合并成大的溢出文件
- 在溢出过程中,及合并的过程中,都要调用 partitioner 进行分区和针对 key 进行排序
- reducetask 根据自己的分区号,去各个 maptask 机器上取相应的结果分区数据
- reducetask 会取到同一个分区的来自不同 maptask 的结果文件,reducetask 会将这些文件再进行合并(归并排序)
- 合并成大文件后,shuffle 的过程也就结束了,后面进入 reducetask 的逻辑运算过程(从文件中取出一个一个的键值对 group,调用用户自定义的 reduce()方法)
注意:
Shuffle 中的缓冲区大小会影响到 mapreduce 程序的执行效率,原则上说,缓冲区越大,磁盘 io 的次数越少,执行速度就越快。
缓冲区的大小可以通过参数调整,参数:io.sort.mb 默认 100M。
Reduce Task流程
Reduce处理描述
- Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
- Merge 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
- Sort 阶段:按照 MapReduce 语义,用户编写 reduce()函数输入数据是按 key 进行聚集的一组数据。为了将 key 相同的数据聚在一起,Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此,ReduceTask 只需对所有数据进行一次归并排序即可。
- Reduce 阶段:reduce()函数将计算结果写到 HDFS 上。