2. HDFS

2.1 概述

在现代的企业环境中,单机容量往往无法存储大量数据,需要跨机器存储。统一管理分布在集群上的文件系统称为分布式文件系统

Hadoop 非常适于存储大型数据 (比如 TB 和 PB)就是因为使用 HDFS 作为存储系统。HDFS 使用多台计算机存储文件,并且提供统一的访问接口,像是访问一个普通文件系统一样使用分布式文件系统,特点是距离优先。

HDFS可以将不同主机的文件进行整合,合成一个大的文件系统,如下图所示:

2. HDFS - 图1

2.2 HDFS的应用场景

2.2.1 适合的应用场景

  • 存储非常大的文件(指的是几百M、G、或者TB级别的数据):需要高吞吐量对延时没有要求
  • 采用流式的数据访问方式:即一次写入、多次读取,数据集经常从数据源生成或者拷贝一次,然后在其上做很多分析工作 。
  • 运行于商业硬件上:Hadoop不需要特别贵的机器,可运行于普通廉价机器,可以节约成本
  • 需要高容错性,比如每个节点上存放文件的多个副本
  • 为数据存储提供所需的扩展能力,这里可以通过增加节点来横向扩展

2.2.2 不适合的应用场景

  • 低延时的数据访问:对延时要求在毫秒级别的应用,不适合采用HDFS。HDFS是为高吞吐数据传输设计的,因此可能牺牲延时
  • 大量小文件:文件的元数据保存在NameNode的内存中, 整个文件系统的文件数量会受限于NameNode的内存大小。 经验而言,一个文件/目录/文件块一般占有150字节的元数据内存空间。如果有100万个文件,每个文件占用1个文件块,则需要大约300M的内存。因此十亿级别的文件数量在现有商用机器上难以支持。
  • 多方读写:需要任意的文件修改 HDFS 采用追加(append-only)的方式写入数据,不支持文件任意offset的修改。不支持多个写入器(writer)。

2.3 HDFS的架构

HDFS是一个主/从(Mater/Slave)体系结构。它由四部分组成:HDFS Client、NameNode、DataNode和Secondary NameNode。

2. HDFS - 图2

2.3.1 客户端(Client)

客户端的作用如下:

  • 文件切分:文件上传 HDFS 的时候,Client 将文件切分成一个一个的Block,然后进行存储;
  • 与 NameNode 交互,获取文件的位置信息
  • 与 DataNode 交互,读取或者写入数据
  • 提供一些命令来管理和访问HDFS,比如启动或者关闭HDFS。

2.3.2 NameNode

NameNode是Master,即管理者,其主要作用如下:

  • 管理 HDFS 的名称空间;
  • 管理数据块(Block)映射信息,比如数据块放在哪一个 DataNode 上;
  • 配置副本策略;
  • 处理客户端读写请求。

2.3.3 DataNode

DataNode就是Slave,由 NameNode下达命令,DataNode 执行实际的操作,其主要作用如下:

  • 存储实际的数据块;
  • 执行数据块的读写操作。

2.3.4 Secondary NameNode

Secondary NameNode的主要作用如下:

  • 辅助 NameNode,分担其工作量;
  • 定期合并 fsimage 和 fsedits,并推送给NameNode;
  • 在紧急情况下,可辅助恢复 NameNode。

【注意】:Secondary NameNode 并非 NameNode 的热备,当NameNode 挂掉的时候,它并不能马上替换 NameNode 并提供服务。

2.4 NameNode 和 DataNode

2. HDFS - 图3

2.4.1 NameNode的作用

  • 在内存中保存着整个文件系统的名称空间和文件数据块的地址映射;
  • 整个HDFS可存储的文件数受限于NameNode的内存大小。

2.4.1.1 NameNode元数据信息

  • 保存在内存中,比如文件名、文件目录结构、文件属性(生成时间,副本数,权限)、每个文件的块列表以及列表中的块与块所在的DataNode之间的地址映射关系;
  • 在内存中加载文件系统中每个文件和每个数据块的引用关系(文件、block、datanode之间的映射信息) 数据会定期保存到本地磁盘(fsImage文件和edits文件):
    • fsImage:源文件的镜像文件;
    • edits:日志信息。

2.4.1.2 NameNode文件操作

  • NameNode负责文件元数据的操作,DataNode负责处理文件内容的读写请求;
  • 数据流不经过NameNode,但是会询问 NameNode 跟哪个DataNode联系

2.4.1.3 NameNode副本

  • 文件数据块到底存放到哪些 DataNode 上,是由 NameNode 决定的,NameNode 根据全局情况做出放置副本的决定;

2.4.1.4 NameNode的心跳机制

  • 全权管理数据块的复制,周期性的接受心跳和块的状态报告信息(包含该DataNode上所有数据块的列表);
  • 若接受到心跳信息,如果在10分钟后还接受不到DataNode的心跳,那么NameNode认为DataNode已经宕机,这时候NameNode准备要把DataNode上的数据块重新复制;
  • 块的状态报告包含了一个 DataNode 上所有数据块的列表,blocks report 每个1小时发送一次。

2.4.2 DataNode的作用

提供真实文件数据的存储服务,具体如下:

  • DataNode 以数据块的形式存储 HDFS 文件
  • DataNode 响应 HDFS 客户端读写请求;
  • DataNode 周期性向 NameNode 汇报心跳信息、数据块信息和缓存数据块信息

2.5 HDFS的副本机制和机架感知

2.5.1 HDFS 文件副本机制

所有的文件都是以块(block)的方式存放在 HDFS 文件系统当中,这样做的好处如下:

  • 一个文件有可能大于集群中任意一个磁盘,引入块机制,可以很好的解决这一问题;
  • 使用块作为文件存储的逻辑单位可以简化存储子系统
  • 块非常适合用于数据备份进而提供数据容错能力。

Hadoop 1.x 当中, 文件的 block 块默认大小是 64M,hadoop 2.x 中, 文件的 block 块大小默认是128M,block 块的大小可以通过 hdfs-site.xml 当中的配置文件进行指定

  1. <property>
  2. <name>dfs.block.size</name>
  3. <value>块大小 以字节为单位</value>
  4. </property>

【注意】:一个块中的数据不一定必须要存满

2. HDFS - 图4

2.5.2 机架感知(重点)

通常大型的分布式集群都会跨好几个机架,由多个机架上的机器共同组成一个分布式集群。机架内的机器之间的网络速度通常都会高于跨机架机器之间的网络速度,并且机架之间机器的网络通信通常受到上层交换机间网络带宽的限制。

HDFS分布式文件系统的内部有一个副本存放策略,以默认的副本数等于3为例:

  • 第一个副本块存本机;
  • 第二个副本块存跟本机同机架内的其他服务器节点;
  • 第三个副本块存不同机架的一个服务器节点上。

此外,我们还可以自定义文件的副本数,命令如下:

hadoop fs -setrep [-R] [-w] <numReplicas> <path>

其中:

  • -R:向后兼容;
  • -w:等待副本复制完成;
  • -rep:numReplicas;
  • -path:希望进行副本数调整的hdfs路径

2.6 HDFS命令行的使用

  • ls
    格式:hdfs dfs -ls URI
    作用:类似于Linux的ls命令,显示hdfs的文件列表
    例子:hdfs dfs -ls /
  • lsr
    格式:hdfs dfs -lsr URI
    作用:在整个目录下递归执行ls, 与UNIX中的ls-R类似
    例子:hdfs dfs -lsr /
  • mkdir
    格式:hdfs dfs [-p] -mkdir <paths>
    作用:以中的URI作为参数,创建目录。使用-p参数可以递归创建目录
    【注意】:这里的递归是指当上级目录不存在时可以自动创建
  • put
    格式:hdfs dfs -put <localsrc > ... <dst>
    作用:将单个的源文件src或者多个源文件srcs从本地文件系统拷贝到目标文件系统中(对应的路径)。也可以从标准输入中读取输入,写入目标文件系统中
    例子:hdfs dfs -put /rooot/a.txt /dir1
  • moveFormLocal
    格式:hdfs dfs -moveFromLocal <localsrc> <dst>
    作用:和put命令类似,但是源文件localsrc拷贝之后自身被删除
    例子:hdfs dfs -moveFromLocal /root/install.log /
  • get
    格式:hdfs dfs -get [-ignorecrc ] [-crc] <src> <localdst>
    作用:将文件拷贝到本地文件系统。 CRC校验失败的文件通过 -ignorecrc 选项拷贝。文件和CRC校验和可以通过-CRC选项拷贝
    例子:hdfs dfs -get /install.log /export/servers
  • mv
    格式:hdfs dfs -mv URI <dest>
    作用:将hdfs上的文件从原路径移动到目标路径(移动之后文件删除),该命令不能跨文件系统
    例子:hdfs dfs -mv /dir1/a.txt /dir2
  • rm
    格式:hdfs dfs -rm [-r] 【-skipTrash】 URI 【URI 。。。】
    作用:删除参数指定的文件,参数可以有多个, 此命令只删除文件和非空目录;如果指定 -skipTrash 选项,那么在回收站可用的情况下,该选项将跳过回收站而直接删除文件;否则,在回收站可用时,在HDFS Shell 中执行此命令,会将文件暂时放到回收站中。
    【注意】:这里没有回收站时会创建一个回收站
    例子:hdfs dfs -rm -r /dir1
  • cp
    格式:hdfs dfs -cp URI [URI ...] <dest>
    作用:将文件拷贝到目标路径中。如果 为目录的话,可以将多个文件拷贝到该目录下。 -f 选项将覆盖目标,如果它已经存在;-p 选项将保留文件属性(时间戳、所有权、许可、ACL、XAttr)。
    例子:hdfs dfs -cp /dir1/a.txt /dir2/b.txt
  • cat
    格式:hdfs dfs -cat URI [uri ...]
    作用:将参数所指示的文件内容输出到stdout
    例子:hdfs dfs -cat /install.log
  • chmod
    格式:hdfs dfs -chmod [-R] URI[URI ...]
    作用:改变文件权限。如果使用 -R 选项,则对整个目录有效递归执行。使用这一命令的用户必须是文件的所属用户,或者超级用户。
    例子:hdfs dfs -chmod -R 777 /install.log
  • chown
    格式:hdfs dfs -chmod [-R] URI[URI ...]
    作用:改变文件的所属用户和用户组。如果使用 -R 选项,则对整个目录有效递归执行。使用这一命令的用户必须是文件的所属用户,或者超级用户。
    例子:hdfs dfs -chown -R hadoop:hadoop /install.log
  • appendToFile
    格式:hdfs dfs -appendToFile <localsrc> ... <dst>
    作用:追加一个或者多个文件到hdfs指定文件中.也可以从命令行读取输入。
    例子:hdfs dfs -appendToFile a.xml b.xml /big.xml

2.7 HDFS高级使用命令

2.7.1 HDFS文件限额配置

在多人共用HDFS的环境下,配置设置非常重要。特别是在Hadoop处理大量资料的环境,如果没有配额管理,很容易把所有的空间用完造成别人无法存取HDFS的配额设定是针对目录而不是针对账号,可以让每个账号仅操作某一个目录,然后对目录设置配置

HDFS文件的限额配置允许我们以文件个数,或者文件大小来限制我们在某个目录下上传的文件数量或者文件内容总量,以便达到我们类似百度网盘网盘等限制每个用户允许上传的最大的文件的量

hdfs dfs -count -q -h /user/root/dir1 #查看配额信息

2.7.1.1 数量限额

hdfs dfs -mkdir -p /user/root/dir #创建hdfs文件夹

hdfs dfsadmin -setQuota 2 dir # 给该文件夹下面设置最多上传两个文件,发现只能 上传一个文件

【注意】:因为目录也算一个限额,所以限额为 n 时只能存储 n - 1 个文件。

hdfs dfsadmin -clrQuota /user/root/dir # 清除文件数量限制

2.7.1.2 空间大小限额

在设置空间配额时,设置的空间至少是 blocksize 3 大小(比如本机就应该设置128M _ 3)

hdfs dfsadmin -setSpaceQuota 4k /user/root/dir # 限制空间大小4KB

dd if=/dev/zero of=1.txt bs=1M count=2 # 生成2M的文件

hdfs dfsadmin -clrSpaceQuota /user/root/dir # 清除空间限额配置

2.7.2 HDFS的安全模式

安全模式是hadoop的一种保护机制,用于保证集群中的数据块的安全性。当集群启动的时候,会首先进入安全模式,当系统处于安全模式时会检查数据块的完整性。

假设我们设置的副本数(即参数 dfs.replication )是3,那么在datanode上就应该有3个副本存在,假设只存在2个副本,那么比例就是2/3=0.666。hdfs默认的副本率0.999。我们的副本率0.666明显小于0.999,因此系统会自动的复制副本到其他dataNode,使得副本率不小于0.999。如果系统中有5个副本,超过我们设定的3个副本,那么系统也会删除多于的2个副本。

【注意】:

  • 副本率 = 实际的副本数/设置的副本数

在安全模式状态下,文件系统只接受读数据请求,而不接受删除、修改等变更请求。当整个系统达到安全标准时,HDFS自动离开安全模式

安全模式操作命令

  • hdfs dfsadmin -safemode get # 查看安全模式状态
  • hdfs dfsadmin -safemode enter # 进入安全模式
  • hdfs dfsadmin -safemode leave # 离开安全模式

2.8 HDFS基准测试

实际生产环境当中,hadoop的环境搭建完成之后,第一件事情就是进行压力测试,测试我们的集群的读取和写入速度,测试我们的网络带宽是否足够等一些基准测试

2.8.1 测试写入速度

向HDFS文件系统中写入数据,10个文件,每个文件 10MB,文件存放到/benchmarks/TestDFSIO中:

hadoop jar /export/servers/hadoop-2.7.5/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.5.jar TestDFSIO -write -nrFiles 10 -fileSize 10MB

完成之后查看写入速度结果:

hdfs dfs -text /benchmarks/TestDFSIO/io_write/part-00000

2.8.2 测试读取速度

在HDFS文件系统中读入10个文件,每个文件10M:

hadoop jar /export/servers/hadoop-2.7.5/share/hadoop/mapreduce/hadoopmapreduce-client-jobclient-2.7.5.jar TestDFSIO -read -nrFiles 10 -fileSize 10MB

查看读取结果:

hdfs dfs -text /benchmarks/TestDFSIO/io_read/part-00000

【注意】:

  • 以上结果均可以通过执行目录下的 TestDFSIO_results.log 进行查询。

2.8.3 清除测试数据

hadoop jar /export/servers/hadoop-2.7.5/share/hadoop/mapreduce/hadoop-mapreduce- client-jobclient-2.7.5.jar TestDFSIO -clean

【注意】:

  • 当前日志不会被清除,只会清除 benchmarks 里的数据。

2.9 HDFS文件写入过程(面试重点)

2. HDFS - 图5

  1. Client 发起文件上传请求,通过 RPC(远程调用)与 NameNode 建立通讯,NameNode检测上传权限 (检查目标文件是否已存在,父目录是否存在,返回是否可以上传);
  2. Client 请求第一个 block 该传输到哪些 DataNode 服务器上
  3. NameNode 根据配置文件中指定的备份数量及机架感知原理进行文件分配,返回可用的 DataNode 的地址如A, B, C
    • Hadoop 在设计时考虑到数据的安全与高效, 数据文件默认在 HDFS 上存放三份,存储策略为本地一份, 同机架内其它某一节点上一份, 不同机架的某一节点上一份。
  4. Client 请求 3 台 DataNode 中的一台 A 上传数据(本质上是一个 RPC 调用,建立 pipeline ), A 收到请求会继续调用 B, 然后 B 调用 C, 将整个 pipeline 建立完成, 后逐级返回 client
  5. Client 开始往 A 上传第一个 block(先从磁盘读取数据放到一个本地内存缓存),以 packet 为单位(默认64K),A 收到一个 packet 就会传给 B,B 传给 C。A 每传一个 packet 会放入一个应答队列等待应答
  6. 数据被分割成一个个 packet 数据包在 pipeline 上依次传输, 在 pipeline 反方向上, 逐个发送 ack(命令正确应答), 最终由 pipeline 中第一个 DataNode 节点 A 将 pipelineack 发送给 Client。
  7. 当一个 block 传输完成之后, Client 再次请求 NameNode 上传第二个 block 到服务器。

2.10 HDFS文件读取过程(面试重点)

2. HDFS - 图6

  1. Client 向 NameNode 发起 RPC 请求,来确定请求文件block所在的位置,这里其实也可以叫权限检查
  2. NameNode会视情况返回文件的部分或者全部block列表对于每个block,NameNode 都会返回含有该 block 副本的所有的DataNode 地址。这些返回的 DataNode 地址会按照集群拓扑结构得出 DataNode 与客户端的距离,然后进行排序,排序有两个规则:
    • 网络拓扑结构中距离 Client 近的排靠前;
    • 心跳机制中超时汇报的 DataNode 状态为 STALE,这样的节点排靠后;
  3. Client 选取排序靠前的 DataNode 来读取 block,如果客户端本身就是DataNode,那么将从本地直接获取数据(短路读取特性);
  4. 底层上本质是建立 Socket Stream(FSDataInputStream),重复的调用父类DataInputStream 的 read 方法,直到这个块上的数据读取完毕;
  5. 当读完列表的 block 后,若文件读取还没有结束,客户端会继续向 NameNode 获取下一批的 block 列表

2.11 HDFS的元数据管理

当 Hadoop 的集群当中,NameNode的所有元数据(指文件的时间、大小、权限和块列表等)信息都保存在了 FsImage 与 Eidts 文件当中,这两个文件就记录了所有的数据的元数据信息,元数据信息的保存目录配置在了 hdfs-site.xml 当中。

<property>
    <name>dfs.namenode.name.dir</name>
    <value>
        file:///export/servers/hadoop2.7.5/hadoopDatas/namenodeDatas,
        file:///export/servers/hadoop-2.7.5/hadoopDatas/namenodeDatas2
    </value>
</property>
<property>
    <name>dfs.namenode.edits.dir</name>
    <value>file:///export/servers/hadoop-2.7.5/hadoopDatas/nn/edits</value>
</property>

2.11.1 FsImage 和 Edits

  • Edits
    • Edits 存放了客户端最近一段时间的操作日志
    • 客户端对 HDFS 进行写文件时会首先被记录在 Edits 文件中
    • Edits 修改时元数据也会更新。
  • Fsimage
  • NameNode 中关于元数据的镜像,一般称为检查点,Fsimage 存放了一份比较完整的元数据信息
  • 因为 Fsimage 是 NameNode 的完整的镜像,如果每次都加载到内存生成树状拓扑结构,这非常耗内存和CPU,所以一般开始时对 NameNode 的操作都放在 Edits 中
  • Fsimage 内容包含了 NameNode 管理下的所有 DataNode 文件及文件 block 及 block 所在的 DataNode 的元数据信息;
  • 随着 Edits 内容增大, 就需要在一定时间点和 Fsimage 合并

2.11.2 FsImage 中的文件信息查看

命令格式:hdfs oiv

cd /export/servers/hadoop2.7.5/hadoopDatas/namenodeDatas
hdfs oiv -i fsimage_0000000000000000864 -p XML -o hello.xml

2.11.3 Edits 中的文件信息查看

命令格式:hdfs oev

cd /export/servers/hadoop2.7.5/hadoopDatas/namenodeDatas
hdfs oev -i edits_0000000000000000865-0000000000000000866 -p XML -o myedit.xml

2.11.4 SecondaryNameNode 如何辅助管理 Fsimage 与 Edits 文件?(重点)

  • SecondaryNameNode 定期合并 Fsimage 和 Edits,把 Edits 控制在一个范围内
  • 首先要配置 SecondaryNameNode
    • SecondaryNameNode 在 conf/masters 中指定;
    • 在 masters 指定的机器上, 修改 hdfs-site.xml
      <property>
      <name>dfs.http.address</name>
      <value>host:50070</value>
      </property>
      
  • 修改 core-site.xml , 这一步不做配置保持默认也可以
    <!-- 多久记录一次 HDFS 镜像, 默认 1小时 -->
    <property>
    <name>fs.checkpoint.period</name>
    <value>3600</value>
    </property>
    <!-- 一次记录多大, 默认 64M -->
    <property>
    <name>fs.checkpoint.size</name>
    <value>67108864</value>
    </property>
    

合并过程如下图:

2. HDFS - 图7

  1. SecondaryNameNode 通知 NameNode 切换 editlog;
  2. SecondaryNameNode 从 NameNode 中获得 fsimage 和 editlog (通过http方式);
  3. SecondaryNameNode 将 fsimage 载入内存,然后开始合并 editlog,合并之后成为新的fsimage;
  4. SecondaryNameNode 将新的 fsimage 发回给 NameNode;
  5. NameNode 用新的 fsimage 替换旧的 fsimage。

特点如下:

  • 完成合并的是 SecondaryNameNode,会请求 NameNode 停止使用 edits,暂时将新写操作放入一个新的文件中 edits.new
  • SecondaryNameNode 从 NameNode 中通过 Http GET 获得 edits,因为要和 fsimage 合并,所以也是通过 Http Get 的方式把 fsimage 加载到内存,然后逐一执行具体对文件系统的操作,与 fsimage 合并,生成新的 fsimage,然后通过 Http POST 的方式把 fsimage 发送给NameNode。 NameNode 从 SecondaryNameNode 获得了 fsimage 后会把原有的 fsimage 替换为新的 fsimage,把 edits.new 变成 edits。同时会更新 fstime;
  • Hadoop 进入安全模式时需要管理员使用 dfsadmin 的 save namespace 来创建新的检查点;
  • SecondaryNameNode 在合并 edits 和 fsimage 时需要消耗的内存和 NameNode 差不多, 所以一般把 NameNode 和 SecondaryNameNode 放在不同的机器上。

2.12 HDFS的API操作

2.12.1 配置环境与导入Maven依赖

  • 下载Hadoop并配置Windows的Hadoop环境变量;
  • 导入相应的maven依赖; ```xml <?xml version=”1.0” encoding=”UTF-8”?> <project xmlns=”http://maven.apache.org/POM/4.0.0

      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    
    4.0.0

    org.example hdfs_api

    1.0-SNAPSHOT 11 11

     <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-common</artifactId>
         <version>2.7.5</version>
     </dependency>
     <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-client</artifactId>
         <version>2.7.5</version>
     </dependency>
     <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-hdfs</artifactId>
         <version>2.7.5</version>
     </dependency>
     <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-mapreduce-client-core</artifactId>
         <version>2.7.5</version>
     </dependency>
     <dependency>
         <groupId>junit</groupId>
         <artifactId>junit</artifactId>
         <version>RELEASE</version>
     </dependency>
    

    org.apache.maven.plugins maven-compiler-plugin 3.1 1.8 1.8 UTF-8 org.apache.maven.plugins maven-shade-plugin 2.4.3 package shade true



<a name="d1cf6f30"></a>
### 2.12.2  使用url方式访问数据(了解)

```java
@Test
public void demo() throws Exception{
    // 1.注册 hdfs 的 url
    URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
    // 2.获取文件输入流
    InputStream inputStream = new
    URL("hdfs://node01:8020/a.txt").openStream();
    // 3.获取文件输出流
    FileOutputStream outputStream = new FileOutputStream(new
    File("D:\\hello.txt"));
    // 4.实现文件的拷贝
    IOUtils.copy(inputStream, outputStream);
    // 5.关闭流
    IOUtils.closeQuietly(inputStream);
    IOUtils.closeQuietly(outputStream);
}

2.12.3 使用文件系统访问数据

2.12.3.1 涉及的主要类

在 Java 中操作 HDFS 主要涉及以下类:

  • Configuration:该类的对象封转了客户端或者服务器的配置
  • FileSystem:该类的对象是一个文件系统对象,可以用该对象的一些方法来对文件进行操作, 通过 FileSystem 的静态方法 get 获得该对象。
    FileSystem fs = FileSystem.get(conf)
    
  • get 方法从 conf 中的一个参数 fs.defaultFS 的配置值判断具体是什么类型的文件系统;
  • 如果我们的代码中没有指定 fs.defaultFS , 并且工程 ClassPath 下也没有给定相应的配置, conf 中的默认值就来自于 Hadoop 的 Jar 包中的 coredefault.xml;
  • 默认值为 file:/// , 则获取的不是一个 DistributedFileSystem 的实例, 而是一个本地文件系统的客户端对象

2.12.3.2 获取 FileSystem 的几种方式

  • 第一种方式:
    @Test
     public void getFileSystem() throws IOException {
         // 1.创建configuration对象
         Configuration configuration = new Configuration();
         // 2.设置文件系统类型
         configuration.set("fs.defaultFS", "hdfs://node01:8020/");
         // 3.获取指定的文件系统
         FileSystem fileSystem = FileSystem.get(configuration);
         // 4.输出
         System.out.println(fileSystem);
     }
    
  • 第二种方式:
    @Test
     public void getFileSystem() throws Exception{
         FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
         System.out.println("fileSystem:" + fileSystem);
     }
    
  • 第三种方式
    @Test
     public void getFileSystem() throws Exception{
         Configuration configuration = new Configuration();
         configuration.set("fs.defaultFS", "hdfs://node01:8020");
         FileSystem fileSystem = FileSystem.newInstance(configuration);
         System.out.println(fileSystem.toString());
     }
    
  • 第四种方式
    @Test
     public void getFileSystem() throws Exception{
         FileSystem fileSystem = FileSystem.newInstance(new URI("hdfs://node01:8020"), new Configuration());
         System.out.println(fileSystem.toString());
     }
    

2.12.3.3 遍历 HDFS 中所有文件

@Test
public void urlHdfs() throws IOException {
    // 1.注册url
    URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
    // 2.获取hdfs文件的输入流
    InputStream inputStream = new URL("hdfs://node01:8020/test.txt").openStream();
    // 3.获取本地文件的输出流
    FileOutputStream outputStream = new FileOutputStream(new File("D://hello.txt"));
    // 4.实现文件的拷贝
    IOUtils.copy(inputStream, outputStream);
    // 5.关流
    IOUtils.closeQuietly(inputStream);
    IOUtils.closeQuietly(outputStream);
}

2.12.3.4 HDFS 上创建文件夹

@Test
public void mkdirs() throws Exception{
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
    boolean mkdirs = fileSystem.mkdirs(new Path("/hello/mydir/test"));
    fileSystem.close();
}

2.12.3.5 HDFS文件下载

@Test
public void getFileToLocal() throws Exception{
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
    FSDataInputStream inputStream = fileSystem.open(new Path("/test.txt"));
    FileOutputStream outputStream = new FileOutputStream(new File("D:\\test.txt"));
    IOUtils.copy(inputStream,outputStream);
    IOUtils.closeQuietly(inputStream);
    IOUtils.closeQuietly(outputStream);
    fileSystem.close();
}

简化写法:

public void getFileToLocal() throws Exception{
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.212.100:8020"), new Configuration());
    fileSyetem.copyToLocalFile(new Path("/test.txt"), new File("D:\\test.txt"));
    fileSystem.close();
}

2.12.3.6 HDFS 文件上传

@Test
public void putData() throws Exception{
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.212.100:8020"), new Configuration());
    fileSystem.copyFromLocalFile(new Path("file:///c:\\install.log"), new Path("/mydir/test"));
    fileSystem.close();
}

2.12.3.7 HDFS访问权限控制(之前配置Hadoop集群时已经做过了)

  1. 停止hdfs集群,在node01机器上执行以下命令
    cd /export/servers/hadoop-2.7.5
    sbin/stop-dfs.sh
    
  1. 修改node01机器上的hdfs-site.xml当中的配置文件
    cd /export/servers/hadoop-2.7.5/etc/hadoop
    vim hdfs-site.xml
    
  1. 修改完成之后配置文件发送到其他机器上面去
    scp hdfs-site.xml node02:$PWD
    scp hdfs-site.xml node03:$PWD
    
  1. 重启hdfs集群
    cd /export/servers/hadoop-2.7.5
    sbin/start-dfs.sh
    
  1. 随意上传一些文件到我们hadoop集群当中准备测试使用
    cd /export/servers/hadoop-2.7.5/etc/hadoop
    hdfs dfs -mkdir /config
    hdfs dfs -put *.xml /config
    hdfs dfs -chmod 600 /config/core-site.xml
    # 这里也可以通过伪装用户执行,用重写的get方法
    
  1. 使用代码准备下载文件
    @Test
    public void getConfig()throws Exception{
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration(),"hadoop");
    fileSystem.copyToLocalFile(new Path("/config/core-site.xml"), new Path("file:///c:/core-site.xml"));
    fileSystem.close();
    }
    

2.12.3.8 小文件合并

由于 Hadoop 善于存储大文件,因为大文件的元数据信息较少,如果 Hadoop 集群当中有大量的小文件,那么每个小文件都需要维护一份元数据信息,会大大的增加集群管理元数据的内存压力,所以在实际工作当中,如果有必要一定要将小文件合并成大文件进行一起处理。

在我们的 HDFS 的 Shell 命令模式下,可以通过命令行将很多的 hdfs 文件合并成一个大文件下载到本地

cd /export/servers
hdfs dfs -getmerge /config/*.xml ./hello.xml

既然可以在下载的时候将这些小文件合并成一个大文件一起下载,那么肯定就可以在上传的时候将小文件合并到一个大文件里面去。

2. HDFS - 图8

@Test
public void mergeFile() throws Exception{
    // 获取分布式文件系统
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.212.100:8020"), new Configuration(),"root");
    FSDataOutputStream outputStream = fileSystem.create(new Path("/bigfile.txt"));
    // 获取本地文件系统
    LocalFileSystem local = FileSystem.getLocal(new Configuration());
    // 通过本地文件系统获取文件列表,为一个集合
    FileStatus[] fileStatuses = local.listStatus(new Path("file:///D:\\input"));
    for (FileStatus fileStatus : fileStatuses) {
        // 获取输入流
        FSDataInputStream inputStream = local.open(fileStatus.getPath());
        // 将小文件的数据复制到大文件
        IOUtils.copy(inputStream,outputStream);
        // 关闭输入流
        IOUtils.closeQuietly(inputStream);
    }
    IOUtils.closeQuietly(outputStream);
    local.close();
    fileSystem.close();
}

2.13 HDFS的高可用机制

2.13.1 HDFS高可用介绍

在 Hadoop 中,NameNode 所处的位置是非常重要的,整个HDFS文件系统的元数据信息都由 NameNode 来管理NameNode 的可用性直接决定了 Hadoop 的可用性,一旦 NameNode 进程不能工作了,就会影响整个集群的正常使用。

典型的HA集群中,两台独立的机器被配置为 NameNode 。在工作集群中,NameNode 机器中的一个处于Active状态,另一个处于Standby状态Active NameNode 负责群集中的所有客户端操作,而 Standby 充当从服务器。Standby机器保持足够的状态以提供快速故障切换(如果需要)

2. HDFS - 图9

2.13.2 组件介绍

  • ZKFailoverController
    是基于Zookeeper的故障转移控制器,它负责控制NameNode的主备切换,ZKFailoverController会监测NameNode的健康状态,当发现Active NameNode出现异常时会通过Zookeeper进行一次新的选举,完成Active和Standby状态的切换
  • HealthMonitor
    周期性调用 NameNode 的 HAServiceProtocol RPC接口(monitorHealth 和 getServiceStatus),监控NameNode的健康状态并向ZKFailoverController反馈(通过心跳机制)。
  • ActiveStandbyElector
    接收ZKFC的选举请求,通过Zookeeper自动完成主备选举,选举完成后回调 ZKFailoverController 的主备切换方法对NameNode进行Active和Standby状态的切换。
  • DataNode
    NameNode包含了HDFS的元数据信息和数据块信息(blockmap),其中数据块信息通过 DataNode 主动向 Active NameNode 和Standby NameNode 上报
  • 共享存储系统(JournalNoder)
    共享存储系统负责存储HDFS的元数据(EditsLog),Active NameNode(写入)和 Standby NameNode(读取)通过共享存储系统实现元数据同步,在主备切换过程中,新的Active NameNode必须确保元数据同步完成才能对外提供服务

2.13.3 配置高可用

配置高可用需要注意两个关键点:

  • 两个NameNode上的元数据信息必须是同步的
    • 一方面是fsimage同步,一方面是edits同步;
    • simage的一致性是通过一台namenode生成,其他namenode同步;
    • edits同步是通过共享存储实现的,edits日志文件的一致性引入了新模块JournalNode,每次写日志文件时,需要将edits日志同步写入JournalNoder集群,这个步骤成功之后才能认定写文件成功。然后其他stand by节点定期从JournalNode中同步日志,以便进行主备切换。
  • 一个NameNode宕机了,另一个可以很快补上来:
    • 采用zookeeper监控NameNode的状态,两个NameNode节点的状态存在zk中,另外两个NameNode节点分别有一个ZKFailoverController(ZKFC)进程监控NameNode状态,实时读取zookeeper中NameNode的状态,来判断当前NameNode是否已经宕机;
    • 如果主节点上的ZKFC发现主节点宕机,那么它就会发送通知给备用节点上的ZKFC,备用NameNode节点的ZKFC发现主节点已经挂掉,那么就会强制给原本的Active NameNode发送ssh强制关闭请求,如果该ssh请求失败,那么就会调用自定义关闭脚本程序,ZKFC获取到命令返回结果,之后Stand By NameNode被激活成Active NameNode;
    • 强制杀死挂掉的NameNode进程是为了避免出现脑裂(网络分区)现象,两个Active NameNode都可以对外提供服务,会造成数据混乱,资源争夺的情况。

2.14 Hadoop的联邦机制(Federation)

2.14.1 背景概述

单 NameNode 的架构使得 HDFS 在集群扩展性和性能上都有潜在的问题,当集群大到一定程度后,NameNode 进程使用的内存可能会达到上百G,NameNode 成为了性能的瓶颈。因而提出了 Namenode水平扩展方案——Federation

  • Federation 中文意思为联邦、联盟,是NameNode 的 Federation,也就是会有多个NameNode
  • 多个NameNode的情况意味着有多个NameSpace(命名空间),区别于HA模式下的多NameNode,它们是拥有着同一个namespace;

2.14.2 Federation架构设计

HDFS Federation 是解决 NameNode 内存瓶颈问题的水平横向扩展方案。

Federation 意味着在集群中将会有多个namenode/namespace。这些namenode之间是联合的,也就是说,他们之间相互独立且不需要互相协调,各自分工,管理自己的区域。分布式的 Datanode 被用作通用的数据块存储存储设备。每个Datanode要向集群中所有的 Namenode 注册,且周期性地向所有namenode发送心跳和块报告,并执行来自所有namenode的命令。

2. HDFS - 图10

Federation一个典型的例子就是上面提到的 NameNode 内存过高问题,我们完全可以将上面部分大的文件目录移到另外一个 NameNode 上做管理。更重要的一点在于,这些 NameNode 是共享集群中所有的 DataNode 的,它们还是在同一个集群内的

这时候在 DataNode 上就不仅仅存储一个 Block Pool 下的数据了,而是多个(在 DataNode 的 datadir 所在目录里面查看BP-xx.xx.xx.xx打头的目录)。

总的来说,多个NameNode共用一个集群里的存储资源,每个NameNode都可以单独对外提供服务。每个NameNode都会定义一个存储池,有单独的 id,每个DataNode都为所有存储池提供存储。DataNode会按照存储池id向其对应的NameNode汇报块信息,同时,DataNode会向所有NameNode汇报本地存储可用资源情况。

HDFS Federation 有如下不足

HDFS Federation 并没有完全解决单点故障问题。虽然 namenode/namespace 存在多个,但是从单个 namenode/namespace 看,仍然存在单点故障:如果某个namenode挂掉了,其管理的相应的文件便不可以访问。Federation中每个namenode仍然像之前HDFS上实现一样,配有一个 secondary namenode,以便主 namenode 挂掉时,用于还原元数据信息。所以一般集群规模真的很大的时候,会采用 HA+Federation 的部署方案。也就是每个联合的 namenodes 都是 HA 的。