0、HDFS 的数据块一般为多大?

HDFS中的文件在物理上是分块存储(Block),块大小配置,默认大小:Hadoop2.x/3.x:128M、Hadoop1.x:64M
制定的依据:寻址时间为传输时间的1%为最好,因此 HDFS 块的大小设置主要取决于磁盘传输速率
HDFS数据块如果设置过小,则程序寻找块位置耗时过长
HDFS数据块如果设置过大,则磁盘传输数据的时间长,且程序在处理数据时,是以块为单位的,每次都处理这么大一个块,会非常慢

公司项目:hadoop fs -stat "%o %r" fsPath得知,block 大小为 128m,备份3个

一、HDFS文件写入和读取过程

或回答:
1)HDFS读写原理(流程)
2)HDFS上传下载流程
3)讲讲(介绍下)HDFS
4)HDFS存储机制
回答这个问题之前,我们先来看下机架感知机制,也就是HDFS上副本存储结点的选择
image.png
Hadoop3.1.3 副本节点选择(默认3个副本,位置默认如下)

  • 一个是client所处的节点(既本地节点)
    • 节点距离最近(与client节点距离为0),上传数据速度最快
  • 一个是另一个机架的随机一个节点
    • 物理上远离第一个节点(与client节点距离为4),保证数据的可靠性
  • 一个是与第二个节点相同机架的另一个节点
    • 既不选择和第二个副本同一个节点,也不选择另一个机架,而是同机架的不同节点(与第二个节点距离为2)是传输效率和数据安全的折衷考虑

HDFS文件写入
首先涉及三个组件:Client、NameNode、DataNode
image.png

  1. 请求和检查:
    • 客户端通过 Distributed FileSystem 模块向 NameNode 请求上传文件,NameNode 检查目标文件是否已存在,父目录是否存在。
  2. NN应答请求
    • NameNode 返回是否可以上传。
  3. Block往哪里传
    • 客户端请求第一个 Block 上传到哪几个 DataNode 服务器上。
  4. NN应答存储位置
    • NameNode 返回 3 个 DataNode 节点(默认为三个副本),分别为 dn1、dn2、dn3。
      • 一个是client所处的节点(既本地节点)
        • 节点距离最近(节点距离为0),上传数据速度最快
      • 一个是另一个机架的随机一个节点
        • 物理上远离第一个节点(节点距离为4),保证数据的可靠性
      • 一个是与第二个节点相同机架的另一个节点
        • 既不选择和第二个副本同一个节点,也不选择另一个机架,而是同机架的不同节点(节点距离为2)是传输效率和数据安全的折衷考虑

image.png

  1. 请求DN连接
    • 客户端通过 FSDataOutputStream 模块请求 dn1 上传数据,dn1 收到请求会继续调用 dn2,然后 dn2 调用 dn3,将这个通信管道建立完成
      • 相对于和所有待上传的 dn 建立关系,客户端只和一个 dn 建立关系,客户端就只需要维持一个传输通道,减少了很多的工作量
      • 然后再一串串地逐级建立传输关系,每个机器都不会有太繁重的传输通道维持压力
  2. DN应答
    • dn1、dn2、dn3 逐级应答客户端。
  3. 传送 Block 到 DN
    1. 客户端开始往 dn1 上传第一个 Block(先从磁盘读取数据放到一个本地内存缓存),以 Packet 为单位,dn1 收到一个 Packet 就会传给 dn2,dn2 传给 dn3;
    2. dn1 每传一个 packet 会放入一个应答队列等待应答。
  4. 重复完成后续的Black
    • 当一个 Block 传输完成之后,客户端再次请求 NameNode 上传第二个 Block 的服务器。(重复执行 3-7 步)。

HDFS 读取

image.png

  1. 客户端通过 DistributedFileSystem 向 NameNode 请求下载文件,NameNode 通过查询元数据,找到文件块所在的 DataNode 地址。—> Block 的寻址耗时
    • NN 是 HDFS 的文件管理者,客户端只存储数据
  2. 挑选一台 DataNode(就近原则,然后随机)服务器,请求读取数据。
  3. DataNode 开始传输数据给客户端(从磁盘里面读取数据输入流,以 Packet 为单位来做校验)。
  4. 客户端以 Packet 为单位接收,先在本地缓存,然后写入目标文件。
  5. 各个Block的读取,一定是串行读,进行追加拼接

二、MapReduce工作原理

可灵活回答:
1)MapReduce 执行流程
2)对 MapReduce 的理解
3)MapReduce 过程
4)MapReduce 的详细过程
5)MapTask 和 ReduceTask工作机制
6)MapReduce 中有没有涉及到排序
整体设计几个部分:Job 提交(切片规划)、map、shuffle、reduce

  1. 客户端 submit() 时提交了 job.split(切片规划文件)、jar包、job.xml 三个文件到 yarn 中
    1. FileInputFormat 接口的实现类对 hdfs 的文件进行切片规划,生成 job.split 文件
  2. yarn 根据切片规划文件,计算出 MapTask 的数量,然后分发到各个Node上进行 MapTask 的执行
  3. MapTask
  • MapTask 分为五个阶段
    • Read 阶段、Map 阶段、Collect 阶段、Spill 阶段、Merge 阶段
    • 其中 collect - 分区、spill - 缓冲、merge - 合并磁盘小文件 是写入磁盘的三个动作
  • Read 阶段
    • MapTask 通过 **InputFormat** 获得的 **RecordReader**,从输入 **InputSplit** 中解析出一个个 key/value
    • TextInputFormat 和 CombineTextInputFormat 都是按行读取,区别在于数据切片时对小文件的处理
  • Map 阶段
    • 该节点主要是将解析出的 key/value 交给用户编写 **map()** 函数处理,并产生一系列新的 key/value。
  • Collect 阶段
    • 在用户编写 map() 函数中,当数据处理完成后,一般会调用 **OutputCollector.collect()** 输出结果
    • 在该函数内部,它会将生成的 key/value 分区(调用 Partitioner),并写入一个环形内存缓冲区中。
  • Spill 阶段
    • 即“溢写”,当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上,生成一个个临时文件
    • 需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
    • 溢写阶段详情
      • 步骤 1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition 进行排序,然后按照 key 进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照 key 有序
      • 步骤 2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件 **output/spillN.out**(N 表示当前溢写次数)中。如果用户设置了 Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作
      • 步骤 3:将分区数据的元信息写到内存索引数据结构 SpillRecord 中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过 1MB,则将内存索引写到文件 output/spillN.out.index 中。
  • Merge 阶段
    • 当所有数据处理完成后,MapTask 对所有临时文件进行一次合并,以确保最终只会生成一个数据文件
    • 保存到文件 output/file.out 中,同时生成相应的索引文件 output/file.out.index
    • 在进行文件合并过程中,MapTask 以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。
    • 每轮合并 mapreduce.task.io.sort.factor(默认 10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。
    • 让每个 MapTask 最终只生成一个数据文件可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销
  1. ReduceTask
  • ReduceTask 分为三个阶段
    • Copy 阶段、Sort 阶段、Reduce 阶段
  • Copy 阶段
    • ReduceTask 从各个 MapTask 上远程拷贝一片数据(该ReduceTask接受的分区数据),并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
  • Sort 阶段
    • 在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
    • 按照 MapReduce 语义,用户编写 reduce() 函数输入数据是按 key 进行聚集的一组数据
    • 为了将 key 相同的数据聚在一起,Hadoop 采用了基于排序的策略
    • 由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此,ReduceTask 只需对所有数据进行一次归并排序即可
  • Reduce 阶段
    • reduce()函数将计算结果写到 HDFS 上。

image.png
image.png
简洁的手画图
Hadoop 面试题 - 图7

shuffle 过程详解

  1. MapTask 收集我们的 map() 方法输出的 kv 对,放到内存缓冲区中
  2. 从内存缓冲区(环形缓冲区)不断溢出本地磁盘文件,可能会溢出多个文件
  3. 多个溢出文件会被合并成大的溢出文件
  4. 在溢出过程及合并的过程中,都要调用 Partitioner 进行分区和针对 key 进行排序 —> 默认使用 HashPartitioner,但一般都会使用自定义的分区器
  5. ReduceTask 根据自己的分区号,去各个 MapTask 机器上取相应的结果分区数据
  6. ReduceTask 会抓取到同一个分区的来自不同 MapTask 的结果文件,ReduceTask 会将这些文件再进行合并(归并排序)
  7. 合并成大文件后,Shuffle 的过程也就结束了,后面进入 ReduceTask 的逻辑运算过程(从文件中取出一个一个的键值对 Group,调用用户自定义的 reduce()方法)

注意:

  1. Shuffle 中的缓冲区大小会影响到 MapReduce 程序的执行效率,原则上说,缓冲区越大,磁盘 io 的次数越少,执行速度就越快。
  2. 缓冲区的大小可以通过参数调整,参数:mapreduce.task.io.sort.mb 默认 100M。

优化方式

  • 增加combiner,压缩溢写的文件

mapReduce有几种排序及排序发生的阶段

排序概述

  • 排序是MapReduce框架中最重要的操作之一。(开发、面试重点)
  • MapTask 和 ReduceTask 均会对数据按 照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
  • 默认排序是按照字典顺序排序,也可自定义排序方法,且实现该排序的方法是快速排序

对于 MapTask 排序

  • 它会将处理的结果暂时放到环形缓冲区中
  • 当环形缓冲区使用率达到一定阈值后(80%),会对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序
    • 并不是来一条数据就排序一次,而是到了阈值才会对缓冲区的数据进行排序
    • 溢写会发生多次:一个切片对应一个 mapTask,因此一个切片为 128M,而环形缓冲区为 100M,且有效存储据的区域为 50%,因此一定会发生多次溢写,生成多个溢写文件
    • 多文件内容的排序:使用归并算法

对于 ReduceTask 排序

  • 它从每个 MapTask 上远程拷贝相应的数据文件(主动
    • MapTask 的输出不是已经在环形缓冲区排序过了吗,为什么还要排序???
  • 如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。
  • 如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;
  • 如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。
  • 当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序

排序分类

  1. 部分排序
    • 最终的输出结果为多个文件
    • MapReduce 根据输入记录的键对数据集排序。
    • 保证输出的每个文件内部有序。也即,每个 Reducer 输出的文件内容都是有序的
  2. 全排序
    • 最终输出结果只有一个文件,且文件内部有序。
    • 实现方式是只设置一个 ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
  3. 辅助排序:(GroupingComparator分组)
    • 在 Reduce 端对 key 进行分组。
    • 应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce 方法时,可以采用分组排序。
  4. 二次排序
    • 在自定义排序过程中,如果排序的字段值相同,指定第二个字段进行排序
      • 直接在 comparaTo() 方法中,但第一个字段值相同时,用第二个字段进行比较

Yarn 任务工作机制

作业提交

  1. Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业
  2. Client向RM申请一个 ApplicationId —》 Client 报告 RM,需要运行一个MR程序
  3. RM将该应用程序的资源路径ApplicationId 返回给 Client。—》 RM 收到后,告诉 Client,你程序运行需要的资源(jar,job.split,job.xml)提交到我指定的hdfs目录下
  4. Client 运行所需资源(jar包、切片信息和配置文件)提交到HDFS指定的资源路径上。—》 Client 听话地将资源放到这个hdfs的目录下
  5. 程序资源提交完毕后,申请运行 mrAppMaster。—》 Client 提交完资源后,接着向 RM 申请一个管理自己MR程序的 ApplicationManager

作业初始化

  1. RM将用户的请求初始化成一个Task,放入公平调度器中—》 RM 收到这个请求后,将这个请求封装成Task,放到任务队列中,等待空闲的 NodeManager 来认领
  2. 其中一个资源空闲 NodeManager 领取到Task任务。—》 该Task被其中一个NM认领,AM将在这个NM上生成,AM将从 Client 手上全面接管任务事项
  3. 该NodeManager创建容器 Container,并产生MRAppmaster。—》 AM生成完毕
  4. Container 从 HDFS 上拷贝资源到本地。—》 AM 是管理整个MR任务的,因此将首先把任务资源从HDFS捞到自己的机器上

任务分配

  1. MRAppMaster 读取拷贝回来的切片信息,得到需要的 MapTask 资源数,向 RM 申请相同数量运行MapTask的资源
  2. RM将运行MapTask任务分配给另外的NodeManager(可能是同一个,也可能是多个),另两个NodeManager分别领取任务并创建容器。

任务运行(MapTask & ReduceTask)

  1. MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
  2. MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
  3. ReduceTask向MapTask获取相应分区的数据。
  4. 程序运行完毕后,MR会向RM申请注销自己。

进度和状态更新

  • YARN中的任务将其进度和状态(包括counter)返回给应用管理器
  • 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新,展示给用户。

作业完成

  • 除了向应用管理器请求作业进度外, 客户端每5秒都会通过调用waitForCompletion()来检查作业是否完成。
  • 时间间隔可以通过mapreduce.client.completion.pollinterval来设置。
  • 作业完成之后,应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。

总结

  • 任务不管需要什么机器资源,都需要向RM申请,前期是 Client 申请,后期是 AM 申请
    • Client 申请运行 AM 的机器
    • AM 申请运行 MapTask 和 ReduceTask 的机器

image.png

后续面试题
https://juejin.cn/post/6930598891771396104