技术交流会
大数据技术——Hadoop基础入门 - 图1

一、Hadoop概述

1、简介

Hadoop是Apache旗下的一个用Java语言实现开源软件框架,是一个开发和运行处理大规模数据的软件平台。允许使用简单的编程模型在大量计算机集群上对大型数据集进行分布式处理。

2、发展史

  • Hadoop最早起源于Nutch。Nutch的设计目标是构建一个大型的全网搜索引擎,但随着抓取网页数量的增加,遇到了严重的可扩展性问题——如何解决数十亿网页的存储和索引问题
  • 2003年、2004年谷歌发表的两篇论文。
    • 分布式文件系统(GFS),可用于处理海量网页的存储
    • 分布式计算框架MAPREDUCE,可用于处理海量网页的索引计算问题。
  • Nutch的开发人员完成了相应的开源实现HDFS和MAPREDUCE,并从Nutch中剥离成为独立项目HADOOP,到2008年1月,HADOOP成为Apache顶级项目,迎来了它的快速发展期。

    3、特点

  1. 高可靠性:Hadoop底层维护多个数据副本,所有即使Hadoop某个计算元素或者存储出现故障,也不会导致数据的丢失。
  2. 高扩展性:在集群间分配任务数据,可方便的扩展数以千计的节点。(动态添加、删除节点)
  3. 高效性:在MapReduce的思想下,Hadoop是并行工作的,以加快任务的处理速度
  4. 高容错性:能够自动将失败的任务重新分配

:::info 狭义上说,Hadoop指Apache这款开源框架,广义上来说,Hadoop通常是指一个更广泛的概念——Hadoop生态圈 ::: image.png

4、组成

Hadoop1.x 与 Hadoop2.x 、Hadoop3.x的区别
image.png

4.1 HDFS(数据存储)

:::info HDFS(Hadoop Distributed File System):分布式文件系统,用于存储和管理数据。它可以将大文件切分成多个块并分布到多个节点上存储,保证高可靠性和高可扩展性。 :::

  1. NameNode(简称 NN ): 存储文件的元数据,如文件名、文件目录结构、文件属性(生成时间、副本数、文件权限),以及每个文件的块列表和块所在的DataNode等。
  2. DataNode(简称 DN ): 在本地文件系统存储文件块数据,以及块数据的校验和。
  3. Secondary NameNode(简称 2NN ): 每隔一段时间对NameNode元数据备份。

    4.2 MapReduce(数据处理)

    :::info MapReduce:分布式计算框架,用于将数据处理任务拆分成多个小任务进行并行计算。它包括Map和Reduce两个阶段,Map阶段将数据进行拆分和处理,Reduce阶段将中间结果合并。 ::: MapReduce将计算过程分为两个阶段:Map和Reduce。

  4. Map阶段并行处理输入数据

  5. Reduce阶段对map结果进行汇总

    4.3 YARN(资源调度)

    :::info YARN(Yet Another Resource Negotiator):资源管理器,用于对集群中的资源进行管理和调度,为各种数据处理应用程序提供资源和服务。 ::: YARN的组成:
  • ResourceManager(简称 RM ):整个集群资源(内存、CPU等)的管理者
  • NodeManager(简称 NM ):管理单个节点服务器的CPU、内存等
  • ApplicationMaster(简称 AM ):管理单个任务运行
  • Container:容器,相当于一台独立的服务器,里面封装了任务运行所需要的资源,如内存、CPU、磁盘、网络等

    二、Hadoop安装

    03-单机环境搭建
    05-完全分布式环境搭建
配置项 Hadoop102 Hadoop103 Hadoop104
HDFS NameNode
DataNode
DataNode SecondaryNameNode
DataNode
YARN NodeManager ResourceMananger NodeManager NodeMananger

三、HDFS

1、简介

:::info HDFS(Hadoop Distributed File System):HDFS是Hadoop分布式文件系统,被设计用于存储大规模数据集并具有高容错性、高吞吐量、高可扩展性等特点。它将数据划分为多个块,这些块可被存储在分布式文件系统中的不同计算节点上,确保数据的备份和容错性。 :::

2、架构

NameNode:简称NN,就是Master,是一个管理者

  • 管理HDFS的名称空间,存放文件元数据(文件名、目录结构、文件属性等)
  • 配置副本策略
  • 管理数据块(block)映射信息(文件与数据块的映射、数据块与数据节点的映射)
  • 处理客户端读写请求

DataNode:就是Slave。根据NameNode的指令执行实际的操作

  • 存储实际的数据块
  • 执行数据块的读/写操作

Secondary NameNode:简称2NN。不是NameNode的热备,当NameNode宕机时,并不能马上替换NameNode提供服务

  • 服务NameNode,分担其工作量,比如定期合并 Fsimage 和 Edits,并推送给NameNode
  • 在紧急情况下,可辅助恢复NameNode

企业中一般会将NameNode搭建成高可用,而不是使用2NN。
Client:客户端

  • 文件切分。文件上传 HDFS 时,Client将文件切分成一个个的Block然后进行上传
  • 与NameNode交互,获取文件的位置信息
  • 与DataNode交互,读取或写入数据
  • Client提供一些命令来管理HDFS,比如NameNode格式化
  • Client可以通过一些命令来访问HDFS,比如对HDFS进行增删改查操作

    3、HDFS 文件块

    HDFS中的文件在物理上是分块存储(Block),块的大小可以通过配置参数dfs.blocksize来规定(位于hdfs-default.xml中), hadoop 2.x、hadoop 3.x 中默认大小是 128M,hadoop 1.x中默认大小是 64M。

数据块大小 128M,如果一个文件大小只有1kb,那么只会占用这个数据块中1kb大小的空间,剩余空间依然可以让其他文件进行占用。

最佳的文件块大小配置:机械硬盘的HDFS数据块最佳大小为128M,固态硬盘的HDFS数据块最大小为256M。

计算方式:

  1. 假设寻址时间为10ms,即查找到目标block的时间为10ms
  2. 根据实践,寻址时间为传输时间的 1% 时,系统为最佳状态。因此传输时间为 10ms / 0.01 = 1s。即花费10ms找到这个数据块,花费1s读完这个数据块的内容。
  3. 目前机械硬盘传输速度约 100M/s, 固态硬盘 200M/s - 300M/s
  4. 所以,对于机械硬盘:1s时间可以传输 100M数据,所以数据块设置为 128M 较为合适。对于固态硬盘,数据块大小设置为256M较为合适。

数据块大小设置规则:

  • 如果数据块设置的太小,会增加寻址时间,程序一直在找块的开始位置
  • 如果数据块设置的太大,从磁盘传输数据的时间会明显大于定位这个块开始位置所需的时间,导致程序在处理这块数据时比较缓慢

总结:HDFS块的大小设置主要取决于磁盘传输速率

4、HDFS的shell操作

基本语法:

  1. # 使用hadoop命令操作hdfs
  2. hadoop fs <具体命令>
  3. # 或者 使用hdfs命令操作hdfs
  4. hdfs dfs <具体命令> # 最后其实也是调用的:hadoop fs <具体命令>

常用命令:

  1. # 列出hdfs上的文件/文件夹列表,类似linuxls命令
  2. hadoop fs -ls /
  3. # 创建文件夹,类似Linuxmkdir命令
  4. hadoop fs -mkdir /wcinput
  5. # 删除hdfs上的文件夹,类似linux rm -rf命令
  6. hadoop fs -rm -f -r /wcoutput
  7. # 统计文件夹的大小信息
  8. # -s 列出总大小,不加该参数时会列出文件夹下每个文件大小
  9. # -h 以适当的单位展示
  10. # 输出结果第一项表示文件大小,第二项表示所有副本加一起的总大小(默认情况下,一个文件有3个副本,所以默认情况下第二项结果等于第一项的3倍)
  11. hadoop fs -du -s -h /jinguo

另外,还有些用于上传、下载操作的命令,例如:

  1. # 从本地剪切,粘贴到hdfs
  2. hadoop fs -moveFromLocal shuguo.txt /sanguo
  3. # 从本地复制,粘贴到hdfs
  4. hadoop fs -copyFromLocal weiguo.txt /sanguo
  5. # 上传,同 copyFromLocal 功能一样
  6. hadoop fs -put aa.txt /wcinput
  7. # 从服务器复制,粘贴到本地
  8. hadoop fs -copyToLocal /sanguo/shuguo.txt ./
  9. # 下载,等同于 copyToLocal
  10. hadoop fs -get /wcinput/aa.txt a.txt

追加一个文件到已经存在的文件末尾:

  1. # liubei.txt追加到shuguo.txt文件末尾
  2. hadoop fs -appendToFile liubei.txt /sanguo/shuguo.txt

设置HDFS中文件副本数量:

  1. hadoop fs -setrep 10 /jinguo/weiguo.txt

设置/jinguo/weiguo.txt的副本数量为10个副本。但是我们目前集群中只有3台服务器,所以最多只有3个副本,此时 hdfs 会先将/jinguo/weiguo.txt副本数量设置为10,等将来节点数量如果增加了,就会在新节点中也备份/jinguo/weiguo.txt副本。

5、HDFS读写流程

5.1 写数据的整体流程

HDFS写数据流程示意
image.png
在第7步中,客户端要向DataNode写数据时,是以一种管道(pipeline)的方式,先向一个DataNode写入,然后由该DataNode继续发给下一个、下一个再发给下下个。不是客户端多线程同时写多个DataNode。
客户端向DataNode传输数据时,也不是直接串行将整个文件块写入,而是将文件块拆分成多个64k的数据包(packet),多个数据包并行的对同一个DataNode进行写入。
DataNode写数据完成后会向给自己发送数据的前一级发送应答信号。如果DataNode写数据失败,没有正确发送应答信号,它的前一级会重试重新向该DataNode传输数据。

写数据流程:

  1. HDFS客户端创建对象实例DistributedFileSystem,该对象中封装了与HDFS文件系统操作的相关方法
  2. 调用DistributedFileSystem对象的create()方法,通过 RPC 请求 NameNode创建文件。NameNode执行各种检查判断:目标文件是否存在、父目录是否存在、客户端是否具有创建文件的权限。NameNode就会为本次请求记下一条记录,返回FSDataOutputStream输出流对象给客户端用于写数据。
  3. 客户端通过FSDataOutputStream输出流开始写入数据
  4. 客户端写入数据时,将数据分成一个个数据包(packeg,默认64k)。内部组件DataStreamer请求NameNode挑选出适合存储数据副本的一组DataNode地址,默认是3副本存储。DataStreamer将数据包流式传输到管道(pipeline)的第一个 DataNode,该DataNode存储数据包并将它发送到 pipeline 的第二个DataNode。同样的,第二个DataNode存储数据包并发送给第三个(也是最后一个)DataNode。
  5. 传输的反方向上,会通过ACK机制校验数据包传输是否成功
  6. 客户端完成数据写入后,在FSDataOutputStream输出流上调用close()方法关闭
  7. 客户端DsitributedFileSystem联系NameNode,告知NameNode文件写入完成,等待NameNode确认。因为NameNode已经知道文件由哪些块组成(DataStream请求分配数据块),因此仅需等待最小复制块即可成功返回。最小复制是由参数dfs.namenode.replication.min指定,默认是1。即只要有1个副本上传成功,NameNode就认为已经上传成功,如果其他DataNode有缺失的块,可以通过这个DataNode继续复制。

    5.2 网络拓扑-节点距离计算

    在HDFS写数据的过程中,NameNode会选择距离和待上传数据最近距离的DataNode接收数据。

节点距离的计算:两个节点到达最近的共同祖先的距离总和。
image.png
以上图为例:

  • 如果两个进程都处于d1/r1/n0( d1集群 r1 机架 n-0 节点)上,那么这两个进程之间的距离就是0
  • 如果两个进程位于同一机架不同节点,一个位于d1/r1/n1节点,一个位于d1/r1/n2节点,这两个节点之间的距离就是:(n1到r1的距离) + (r1到n2的距离) = 2
  • 如果两个进程位于同一个集群的不同机架上,一个位于d1/r2/n0,一个位于d1/r3/n2,这两个节点之间的距离就是:(no到r2+r2到d1) + (d1到r3+r3到n2) = 4
  • 如果两个进程是不同集群的节点,那么距离就是6

    5.3 机架感知(副本存储节点的选择)

    根据节点选择的官方说明,当使用的默认3个节点副本时,hdfs选择的副本存储节点为:

  • 副本1存储在本机节点 one replica on the local machine

  • 副本2存储在另一个机架的一个节点 another replica on a node in a different (remote) rack
  • 副本3存储在和副本2相同机架的另一个节点 the last on a different node in the same remote rack

image.png

5.4 HDFS读数据流程

image.png
在第1步时,NameNode接到了客户端的请求,会判断客户端的用户是否有权限读,并且判断hdfs中是否有该文件,然后将元数据响应给客户端。

在第3步时,客户端要从DataNode中读取数据,而一个文件块会有多个副本,客户端会考虑哪个DataNode离自己最近,并且该DataNode的访问量负载不是很高才从这个DataNode上下载。

在第5步请求第二个文件块blk_2时,是在已经读取完成了blk_1之后才会发出该请求,是串行读,不是多线程并行。最后将读到的blk_2数据追加到blk_1末尾,就可以拼接成一个完整的文件。

所以,我们在 hadoop 服务器上的data文件夹中找到hadoop存放数据的文件夹$HADOOP_HOME/data/dfs/data/current/BP-xxxxxxx/current/finalized/subdir0/subdir0,在里面将某个文件的几个blk_xxx按顺序拼接,也能恢复出原文件:

  1. # 需要能找到abc.txt文件拆分的文件块,以及这些文件块所在的服务器、编号
  2. # 因为我们集群只有3台服务器,且hdfs默认副本数量为3,所以每个文件块在这3台服务器上都能找到
  3. # 假设 abc.txt被拆分成的文件块为 blk_1 blk_2(文件块编号可以在hdfs页面上查看到,即 blk_BlockID
  4. # blk_1的数据写入abc.txt
  5. cat blk_1 >> abc.txt
  6. # blk_2的数据追加到abc.txt
  7. cat blk_2 >> abc.txt
  8. # 此时就可以恢复出文件abc.txt

6、NN 和 2NN

NameNode和SecondaryNameNode工作流程示意:
image.png :::info fsimages(镜像文件)是负责记录最终实际值。
edits(编辑日志)存储的是每次数据的操作步骤。例如:edits_inprogress_xxx :::

在没有 SecondaryNameNode 的集群中,NameNode的工作流程是:

  1. 系统启动后,将edits和fsimage从硬盘加载到内存中,方便数据的快速增删改。先加载fsimage内容到内存,然后在内存中按顺序把将edits中修改的内容执行一遍。
  2. 元数据发生变动时,edits中记录改变向量,内存中数据也同步修改,但是不写入磁盘的fsimage
  3. 系统关闭时,将edits中的改变向量按顺序执行,写入到fsimage中

有了SecondaryNameNode的集群,SecondaryNameNode会定期询问NameNode是否到达检查点(checkpoint),如果到达了检查点,SecondaryNameNode就辅助NameNode执行edits信息向fsimage中写入。
NameNode工作流程为:

  1. 系统启动,将edits和fsimage从磁盘加载到内存中
  2. 客户端发出修改元数据的请求给NameNode
  3. NameNode将修改数据的改变向量写入磁盘的edit_inprogress中,然后同步修改掉内存中的数据

SecondaryNameNode工作流程为:

  1. 向NameNode询问是否到达检查点(默认如果Edits记录的修改次数达到100万,或者距离上个checkpoint时间间隔了1小时,就到达了检查点)
  2. 如果到达检查点,请求执行Checkpoint
  3. NameNode的edit_inprogress_001中存储的改变向量滚动写入Edits中。在edit_inporgress_001写入edits过程中,如果客户端向NameNode发出改变元数据的请求,这部分新的改变向量被暂时先写入edit_inprogress_002中。
  4. 将NameNode的Edits、fsimage拷贝到SecondaryNameNode
  5. 将Edits、fsimage信息加载到自己的内存中。在fsimage基础上顺序执行Edits中的改变向量
  6. 将内存中的计算结果写入磁盘fsimage.checkpoint
  7. 将fsimage.checkpoint拷贝给NameNode
  8. NameNode将fsimage.checkpoint重命名为fsimage,覆盖原有的fsimage

    7、DataNode工作机制

    DataNode工作流程:image.png
    一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件:一个是数据本身,一个是元数据 xxxxx.meta(包括数据块的长度、块数据的校验和,以及时间戳)。
    image.png
    DataNode启动后向NameNode注册,通过后,周期性(默认6小时)的向NameNode上报所有的块信息。

心跳是每3秒一次,心跳返回结果带有NameNode给该DataNode的命令(如复制数据块到另一台机器,或删除某个数据块)。如果超过10分钟 + 30秒(超过10分钟,再给10次机会)还没有收到某个DataNode的心跳,就认为该节点不可用,认为该节点宕机了,不会再向该节点传输信息。

四、MapReduce

1、简介

:::info MapReduce:是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。
MapReduce核心功能是将用户编写的业务逻辑代码自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。 :::

2、入门案例

官方案例:

  1. hadoop jar /opt/module/hadoop-3.2.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.3.jar wordcount /input/word.txt /output

用户编写的程序分为三个部分:

  • Mapper
  • Reducer
  • Driver

Hadoop在Java数据类型基础上又封装了新的数据类型,这些类都实现了Hadoop的序列化接口Writable:位于org.apache.hadoop.io包下

Java类型 Hadoop Writable类型
String Text
Boolean BooleanWritable
Byte ByteWritable
Int IntWritable
Float FloatWritable
Long LongWtitable
Double DoubleWritable
Map MapWritable
Array ArrayWritable
Null NullWritable

环境搭建:

  1. 创建Maven工程,加入依赖:

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.apache.hadoop</groupId>
    4. <artifactId>hadoop-client</artifactId>
    5. <version>3.2.3</version>
    6. </dependency>
    7. <dependency>
    8. <groupId>junit</groupId>
    9. <artifactId>junit</artifactId>
    10. <version>4.12</version>
    11. </dependency>
    12. <dependency>
    13. <groupId>org.slf4j</groupId>
    14. <artifactId>slf4j-log4j12</artifactId>
    15. <version>1.7.30</version>
    16. </dependency>
    17. </dependencies>
  2. 配置日志:log4j.properties

    1. log4j.rootLogger=INFO, stdout
    2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    3. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    4. log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
    5. log4j.appender.logfile=org.apache.log4j.FileAppender
    6. log4j.appender.logfile.File=target/hadoop-client.log
    7. log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
    8. log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

    Mapper

    Mapper代码编写:

  • 用户自定义的Mapper要继承Mapper类,传入的泛型为(<输入数据key的类型, 输入数据value的类型, 输出结果key的类型,输出结果value的类型>)
  • Mapper的输入数据是Key-Value形式(key、value的类型可以自定义)
  • 用户的实现类中需要重写Mapper类的map()方法,在方法中写业务逻辑
  • Mapper的输出数据是key-value的形式(key、value的类型可以自定义)
  • map()方法(MapTask进程)对每一个 key-value 调用一次,对输入的数据文件内容默认按行处理 ```java package com.study.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**

  • Mapper泛型:
  • 输入的key类型:本程序需要将偏移量当做key,所以是LongWritable类型
  • 输入的value类型:一般都是文本字符串,所以是Text类型
  • 输出的key类型:本程序的Mapper输出的是单词数量,所以key是单词,Text类型
  • 输出的value类型:单词的个数,所以是IntWritable类型 */ public class WordCountMapper extends Mapper {

    private Text outKey = new Text(); private IntWritable outValue = new IntWritable(1); // 因为我们map阶段不聚合,每个单词出现一次就记一个1 @Override protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {

    1. String line = value.toString(); // 获取一行信息
    2. String[] words = line.split(" ");// 拆分一行内容中的单词
    3. for (String word : words) {
    4. outKey.set(word); // 将word转换成Text类型
    5. // 将 key-value 输出到 context 中,供后面的Reducer使用
    6. context.write(outKey, outValue); // xxx单词出现了1次
    7. }

    } } ```

    Reducer

    Reducer编写:

  • 用户自定义的Reducer需要继承Reducer类
  • Reducer的输入数据类型对应Mapper的输出数据类型,也是 key-value 形式
  • 用户的实现类中需要重写Reducer类的reduce()方法,在方法中写业务逻辑
  • ReduceTask进程对每一组相同 key 的 key-value 组调用一次reduce()方法 ```java 假如Mapper计算的结果key-value为以下内容: { a:1} { a:1} { c:1 } { b:1 }

到了reduce()方法中,就会将相同的key组成一个组,值为一个集合: { a: [1, 1], b: [1], c: [1] }

  1. ```java
  2. package com.study.mapreduce.wordcount;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.io.LongWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Reducer;
  7. import java.io.IOException;
  8. /**
  9. * Reducer泛型:
  10. * 输入的key类型:单词字符串,即Mapper的输出的key类型
  11. * 输入的value类型:单词出现次数(因为map没有聚合,所以每个value都是1)即mapper的输出的value类型
  12. * 输出的key类型:单词,所以是Text类型
  13. * 输出的value类型:汇总的单词个数,所以是IntWritable类型
  14. */
  15. public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
  16. IntWritable outValue = new IntWritable();
  17. @Override
  18. protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  19. int sum = 0;
  20. for (IntWritable value : values) {
  21. sum += value.get(); // value是IntWritable类型,需要调用get()进行类型转换
  22. }
  23. outValue.set(sum);
  24. context.write(key, outValue);
  25. }
  26. }

Driver

相当于Yarn集群的客户端,用于提交我们整个程序到Yarn集群,提交的是封装了的MapReduce程序相关运行参数的job对象。

  1. package com.study.mapreduce.wordcount;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. import java.io.IOException;
  10. public class WordCountDriver {
  11. public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  12. // 1. 获取job
  13. Configuration config = new Configuration();
  14. Job job = Job.getInstance(config);
  15. // 2.设置 jar 路径
  16. job.setJarByClass(WordCountDriver.class); // 可以直接通过当前类的全类名反射获取到jar包路径
  17. // 3.关联mapper和reducer
  18. job.setMapperClass(WordCountMapper.class);
  19. job.setReducerClass(WordCountReducer.class);
  20. // 4.设置map输出的key、value类型
  21. job.setMapOutputKeyClass(Text.class);
  22. job.setMapOutputValueClass(IntWritable.class);
  23. // 5.设置最终输出(最终输出不一定是Reducer输出)的key、value类型
  24. job.setOutputKeyClass(Text.class);
  25. job.setOutputValueClass(IntWritable.class);
  26. // 6.设置输入路径和输出路径
  27. FileInputFormat.setInputPaths(job, new Path("/app/WordCount/input")); // 本地文件路径,可以输入多个
  28. FileOutputFormat.setOutputPath(job, new Path("/app/WordCount/output/output2")); // 本地文件路径(需要是一个不存在的文件夹,否则会报错目录已存在)
  29. // 可以调用 job.submit() 提交作业
  30. // 但是为了调试,可以调用waitForCompletion,传入一个true,让程序输出监控信息
  31. // waitForCompletion内部也是调用了 job.submit()
  32. boolean success = job.waitForCompletion(true);
  33. System.exit(success ? 0 : 1); // 程序退出
  34. }
  35. }

3、Hadoop序列化

序列化:就是把内存中的对象转换成字节序列(或其他数据传输协议),以便于存储到磁盘(持久化)和网络传输。

反序列化:就是将收到的字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

因为Java自带的序列化存储了很多额外信息(各种校验、Header、继承体系等),是一个重量级序列化框架,不便于在网络中传输,所以Hadoop有自己的一套序列化。

Hadoop序列化的特点:

  • 紧凑:存储空间少
  • 快速:传输速度快
  • 互操作性:支持多语言的交互

实际开发中,Hadoop自带的Text、IntWritable等基本的序列化类型往往不够用,需要自定义一些可序列化的 JavaBean

自定义需要序列化的类:

  1. 必须实现Writable接口
  2. 反序列化时,需要反射调用空参构造函数,所以必须要有空参构造
  3. 实现序列化方法write():
  4. 实现反序列化方法readFields():
  5. 注意反序列化顺序要和序列化的顺序完全一致(先进先出)
  6. 如果想把结果显示在文件中,还需要重写toString()方法
  7. 如果要把自定义的类的对象放在key中传输,则还需要实现Comparable接口,因为MapReduce框架中的Shuffle过程要求key必须能够排序

4、序列化案例

需求:统计每个手机号消耗的总上行流量、总下行流量、总流量
数据格式

  1. 1 13892386621 127.0.0.1 200 500 200
  2. 2 11234426621 127.0.0.1 231 322 200
  3. 3 11234426621 127.0.0.1 300 400 200
  4. 4 13892386621 127.0.0.1 300 600 200
  5. id 手机号 网络ip 上行流量 下行流量 网络状态码

期望输出格式

  1. 13892386621 500 1100 1600
  2. 手机号 总上行流量 总下行流量 总流量
  1. 实例化对象 ```java package com.hadoop.demo.flow; import com.hadoop.demo.writable.MyWritable; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;

public class FlowBean implements Writable, Comparable { // 上行流量 private long upFlow; // 下行流量 private long downFlow; // 总流量 private long sumFlow;

  1. public FlowBean() {
  2. }
  3. @Override
  4. public int compareTo(FlowBean o) {
  5. return 0;
  6. }
  7. @Override
  8. public void write(DataOutput dataOutput) throws IOException {
  9. dataOutput.writeLong(upFlow);
  10. dataOutput.writeLong(downFlow);
  11. dataOutput.writeLong(sumFlow);
  12. }
  13. @Override
  14. public void readFields(DataInput dataInput) throws IOException {
  15. this.upFlow = dataInput.readLong();
  16. this.downFlow = dataInput.readLong();
  17. this.sumFlow = dataInput.readLong();
  18. }
  19. public long getUpFlow() {
  20. return upFlow;
  21. }
  22. public void setUpFlow(long upFlow) {
  23. this.upFlow = upFlow;
  24. }
  25. public long getDownFlow() {
  26. return downFlow;
  27. }
  28. public void setDownFlow(long downFlow) {
  29. this.downFlow = downFlow;
  30. }
  31. public long getSumFlow() {
  32. return sumFlow;
  33. }
  34. public void setSumFlow() {
  35. this.sumFlow = this.upFlow + this.downFlow;
  36. }
  37. @Override
  38. public String toString() {
  39. return upFlow + " " + downFlow + " " + sumFlow;
  40. }

}

  1. 2. Mapper
  2. ```java
  3. package com.hadoop.demo.flow;
  4. import org.apache.hadoop.io.LongWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Mapper;
  7. import java.io.IOException;
  8. public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
  9. private Text outK = new Text();
  10. private FlowBean outV = new FlowBean();
  11. @Override
  12. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
  13. // 获取每一行数据
  14. String line = value.toString();
  15. String[] split = line.split(" ");
  16. // 抓取想要的数据
  17. // 手机号
  18. String phone = split[1];
  19. // 上行流量
  20. String up = split[split.length - 3];
  21. // 下行流量
  22. String down = split[split.length - 2];
  23. // 封装
  24. outK.set(phone);
  25. outV.setUpFlow(Long.parseLong(up));
  26. outV.setDownFlow(Long.parseLong(down));
  27. outV.setSumFlow();
  28. // 写出
  29. context.write(outK, outV);
  30. }
  31. }
  1. Reducer ```java package com.hadoop.demo.flow;

import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowReducer extends Reducer {

  1. private FlowBean flowBean = new FlowBean();
  2. @Override
  3. protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
  4. // 遍历集合累计求和
  5. long totalUp = 0;
  6. long totalDown = 0;
  7. for (FlowBean flowBean : values) {
  8. totalUp += flowBean.getUpFlow();
  9. totalDown += flowBean.getDownFlow();
  10. }
  11. // 封装
  12. flowBean.setUpFlow(totalUp);
  13. flowBean.setDownFlow(totalDown);
  14. flowBean.setSumFlow();
  15. // 写出
  16. context.write(key, flowBean);
  17. }

}

  1. 4. Driver
  2. ```java
  3. package com.hadoop.demo.flow;
  4. import com.hadoop.demo.wordcount.WordCountDriver;
  5. import com.hadoop.demo.wordcount.WordCountMapper;
  6. import com.hadoop.demo.wordcount.WordCountReducer;
  7. import org.apache.hadoop.conf.Configuration;
  8. import org.apache.hadoop.fs.Path;
  9. import org.apache.hadoop.io.IntWritable;
  10. import org.apache.hadoop.io.Text;
  11. import org.apache.hadoop.mapreduce.Job;
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  14. import java.io.IOException;
  15. public class FlowDriver {
  16. public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  17. // 1. 获取job
  18. Configuration config = new Configuration();
  19. Job job = Job.getInstance(config);
  20. // 2.设置 jar 路径
  21. job.setJarByClass(FlowDriver.class);// 可以直接通过当前类的全类名反射获取到jar包路径
  22. // 3.关联mapper和reducer
  23. job.setMapperClass(FlowMapper.class);
  24. job.setReducerClass(FlowReducer.class);
  25. // 4.设置map输出的key、value类型
  26. job.setMapOutputKeyClass(Text.class);
  27. job.setMapOutputValueClass(FlowBean.class);
  28. // 5.设置最终输出(最终输出不一定是Reducer输出)的key、value类型
  29. job.setOutputKeyClass(Text.class);
  30. job.setOutputValueClass(FlowBean.class);
  31. // 6.设置输入路径和输出路径
  32. FileInputFormat.setInputPaths(job, new Path("D:\\Hadoop\\input2")); // 本地文件路径,可以输入多个
  33. FileOutputFormat.setOutputPath(job, new Path("D:\\Hadoop\\output2")); // 本地文件路径(需要是一个不存在的文件夹,否则会报错目录已存在)
  34. // FileInputFormat.setInputPaths(job, new Path(args[0]));
  35. // FileOutputFormat.setOutputPath(job, new Path(args[1]));
  36. // 可以调用 job.submit() 提交作业
  37. // 但是为了调试,可以调用waitForCompletion,传入一个true,让程序输出监控信息
  38. // waitForCompletion内部也是调用了 job.submit()
  39. boolean success = job.waitForCompletion(true);
  40. System.exit(success ? 0 : 1); // 程序退出
  41. }
  42. }

5、MapReduce框架原理

MapReduce框架运行流程:image.png
MapReduce框架主要分为Map阶段和Reduce阶段,在这里我们统称“MapTask”、“ReduceTask”。
MapTask阶段:

  1. 确认数据源input,选择InputFormat文件读取方式(默认是按行读取,K-V模式,K为偏移量,V为每行数据)

Reduce阶段:

  1. Reducer主动拉取Mapper中的数据,中间会经历Shuffle阶段
  2. Shuffle阶段的功能有:排序、分区、压缩、合并
  3. Reducer处理完数据后,根据OutputFormat选择输出方式,也可以自定义输出方式(Mysql、ES、Hbase)

6、切片与MapTask并行度决定机制

MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个 Job 的处理速度。
数据块:Block是HDFS物理上把数据分成一块一块的,数据块是HDFS存储数据单位。一般为128M
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask
数据块是物理上的分开存储,例如一个129Mb的数据文件,在存入 hdfs 时,因为hdfs一个数据块默认大小只有128Mb,所以会被分成两个数据块存储:block0存储128Mb,block1存储1Mb,这两个数据块可能存储在不同的服务器上,这个是物理上的分开存储。
数据切片是逻辑上的切片,不是真正的物理磁盘上分开存储。例如将这个129Mb的文件切成一个100Mb和一个29Mb两个片,那么只会找个位置记录下来:0-100索引位置属于第一个片,100-129索引位置属于第二个片,并不影响物理上的存储。
切片的大小会影响到执行的效率
image.png
总结:

  1. 一个Job的Map阶段并行度由客户端在提交Job时的切片数决定
  2. 每一个Split切片分配一个MapTask并行实例处理
  3. 默认情况下,切片大小等于数据块大小
  4. 切片时,不考虑数据集整体,而是逐个针对每一个文件单独切片。

    例如: 输入数据中有两个文件(a.txt、b.txt),其中 a.txt 大小为300Mb, b.txt大小为100Mb。切片时,不会按整体400Mb切分,而是逐个对这两个文件进行切分。 切片大小为128Mb的话:先将a.txt切分成3个片:128Mb、128Mb、44Mb,然后再将b.txt切分成一个100Mb的片。最后产生4个切片、4个MapTask。而不是将b.txt追加到a.txt上,因为b.txt的block和a.txt的block不是同一个,如果合一起切就又会产生网络IO。

7、job提交流程的源码分析

根据前面写的WordCount程序可知,MapReduce中真正执行job的是job.waitForCompletion(true)方法。job.waitForCompletion(true)方法中调用了submit()提交job
submit()方法的源码如下:

  1. public void submit() throws IOException, InterruptedException, ClassNotFoundException {
  2. ensureState(JobState.DEFINE); // 1.确认job的状态为DEFINE(未运行)状态
  3. setUseNewAPI(); // 2.如果使用的是mapred包(hadoop 1.x)中的类,做一些特殊配置进行兼容
  4. // 3.获取hadoop集群的连接
  5. // connect()方法内部会调用到Cluster类构造方法,Cluster构造方法会调用initialize()初始化方法
  6. // Cluster的initialize()初始化方法中,会得到两个Provider:YarnClientProtocolProvider、LocalClientProtocolProvider
  7. // 如果配置项mapreduce.framework.name的值为yarn,则使用YarnClientProtocolProvider;如果为local,则使用LocalClientProtocolProvider
  8. // mapreduce.framework.name在mapred-default.xml默认值为local,我们本地的hadoop没有修改该配置项,所以本地运行WordCount会使用本地模式运行,即输入、输出路径都是本地路径。而我们的hadoop102集群中,在mapred-site.xml中将该值配置为了yarn,所以在hadoop102上运行WordCount程序时走的是Yarn,即输入、输出路径都是hdfs路径。
  9. connect();
  10. final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
  11. status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
  12. public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {
  13. // 4.向集群提交job信息
  14. return submitter.submitJobInternal(Job.this, cluster);
  15. }
  16. });
  17. state = JobState.RUNNING; // 将DEFINE(未运行)状态改为 RUNNING(运行)
  18. LOG.info("The url to track the job: " + getTrackingURL());
  19. }

submitter.submitJobInternal(Job.this, cluster)向集群提交job信息的源码:

  1. JobStatus submitJobInternal(Job job, Cluster cluster)
  2. throws ClassNotFoundException, InterruptedException, IOException {
  3. // 1.校验output文件夹是否存在。如果存在,会抛出异常output文件夹已存在
  4. checkSpecs(job);
  5. Configuration conf = job.getConfiguration();
  6. addMRFrameworkToDistributedCache(conf);
  7. // 2.获取临时路径(该路径后面会用于临时存放job的切片等信息,处理完毕后该文件夹会清空)
  8. // 该路径的前面一部分可以通过mapreduce.jobtracker.staging.root.dir进行配置
  9. // 如果没有配置,则默认取/tmp/hadoop/mapred/staging
  10. // 然后再后面加上 <当前用户名(用户名为空则取dummy)+随机数>/.staging
  11. // 例如:/tmp/hadoop/mapred/staging/tengyer2113150384/.staging
  12. Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
  13. // ......配置一些命令行信息。省略
  14. // 3.创建一个jobID,提交的每个任务都有一个唯一的jobID,例如:job_local2113150384_0001
  15. JobID jobId = submitClient.getNewJobID();
  16. job.setJobID(jobId);
  17. // 4.在上面创建的/tmp/hadoop/mapred/staging/tengyer2113150384/.staging基础上,
  18. // 创建一个新的path:/tmp/hadoop/mapred/staging/tengyer2113150384/.staging/job_local2113150384_0001
  19. Path submitJobDir = new Path(jobStagingArea, jobId.toString());
  20. JobStatus status = null;
  21. try {
  22. // ........ 中间这一块是设置一些配置信息、缓存信息,省略
  23. // 5.将job的jar包、依赖、配置文件等内容提交到集群
  24. // yarn模式才会将jar包提交到集群,local模式不提交
  25. copyAndConfigureFiles(job, submitJobDir);
  26. Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
  27. LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
  28. // 6.切片,将切片信息临时保存到/tmp/hadoop/mapred/staging/tengyer2113150384/.staging/job_local2113150384_0001
  29. // 会生成 【job.split、.job.split.crc、job.splitmetainfo、.job.splitmetainfo.crc】文件,保存切片信息
  30. int maps = writeSplits(job, submitJobDir);
  31. // 将切片的个数赋值给将来要创建的MapTask数量,有几个切片就有几个MapTask
  32. conf.setInt(MRJobConfig.NUM_MAPS, maps);
  33. LOG.info("number of splits:" + maps);
  34. int maxMaps = conf.getInt(MRJobConfig.JOB_MAX_MAP,
  35. MRJobConfig.DEFAULT_JOB_MAX_MAP);
  36. if (maxMaps >= 0 && maxMaps < maps) {
  37. throw new IllegalArgumentException("The number of map tasks " + maps +
  38. " exceeded limit " + maxMaps);
  39. }
  40. // ....... 队列、缓存等信息。省略
  41. // 7.在/tmp/hadoop/mapred/staging/tengyer2113150384/.staging/job_local2113150384_0001下创建 【jbo.xml、.job.xml.crc】
  42. // job.xml中保存了job运行需要的配置参数信息
  43. writeConf(conf, submitJobFile);
  44. //
  45. // Now, actually submit the job (using the submit name)
  46. // 8.真正的提交 job
  47. //
  48. printTokens(jobId, job.getCredentials());
  49. status = submitClient.submitJob(
  50. jobId, submitJobDir.toString(), job.getCredentials());
  51. if (status != null) {
  52. return status;
  53. } else {
  54. throw new IOException("Could not launch job");
  55. }
  56. } finally {
  57. i (status == null) {
  58. LOG.info("Cleaning up the staging area " + submitJobDir);
  59. if (jtFs != null && submitJobDir != null)
  60. jtFs.delete(submitJobDir, true);
  61. }
  62. }
  63. }

最后,在job.waitForCompletion(true)运行完monitorAndPrintJob()方法后,/tmp/hadoop/mapred/staging/tengyer2113150384/.staging/job_local2113150384_0001文件夹被清空。

8、FileInputFormat切片源码分析

submitter.submitJobInternal(Job.this, cluster)向集群提交job信息的源码中,执行到int maps = writeSplits(job, submitJobDir);时会进行切片。

以本次的WordCount程序为例,该程序使用到的InputFormat实现类为FileInputFormat,其getSplits(JobContext job)方法源码:

  1. public List<InputSplit> getSplits(JobContext job) throws IOException {
  2. StopWatch sw = new StopWatch().start();
  3. // getFormatMinSplitSize()在本类中固定返回1
  4. // getMinSplitSize(job)获取配置项mapreduce.input.fileinputformat.split.minsize的值,在mapred-default.xml中该配置项默认值为0
  5. // 所以两个数取最大值结果为1
  6. long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
  7. // 获取配置项mapreduce.input.fileinputformat.split.maxsize的值,mapred-default.xml中默认没有该配置项
  8. // 没有该配置项时,取默认值:Long.MAX_VALUE (Long的最大值)
  9. long maxSize = getMaxSplitSize(job);
  10. // generate splits
  11. List<InputSplit> splits = new ArrayList<InputSplit>();
  12. List<FileStatus> files = listStatus(job);
  13. boolean ignoreDirs = !getInputDirRecursive(job)
  14. && job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
  15. // 循环遍历所有输入文件,分别对每个文件进行分片
  16. // 这也证明了:切片时,不考虑数据集整体,而是逐个针对每一个文件单独切片。
  17. for (FileStatus file: files) {
  18. if (ignoreDirs && file.isDirectory()) {
  19. continue;
  20. }
  21. // 获取文件路径、内容长度
  22. Path path = file.getPath();
  23. long length = file.getLen();
  24. if (length != 0) {
  25. BlockLocation[] blkLocations;
  26. if (file instanceof LocatedFileStatus) {
  27. blkLocations = ((LocatedFileStatus) file).getBlockLocations();
  28. } else {
  29. FileSystem fs = path.getFileSystem(job.getConfiguration());
  30. blkLocations = fs.getFileBlockLocations(file, 0, length);
  31. }
  32. // 判断文件是否允许切片
  33. // 对于普通文本文件,从文件中间截断不会有影响。
  34. // 但是如果是一些压缩文件(有些压缩文件也支持切片),截断后内容不完整程序就无法处理类。所以一些压缩文件就只允许切成1个片(即没有切片)
  35. if (isSplitable(job, path)) {
  36. // 获取数据块大小。
  37. // local本地模式,默认的数据块大小是 32Mb
  38. // Hadoop 1.x:默认的数据块大小是 64Mb
  39. // Hadoop 2.x/3.x:默认的数据块大小是 128Mb
  40. long blockSize = file.getBlockSize();
  41. // 计算切片大小
  42. // 该方法的实现为:Math.max(minSize, Math.min(maxSize, blockSize))
  43. // 由前面的配置可知,minSize为1,maxSize为Long.MAX_VALUE,blockSize为 32Mb(当前是本地模式运行)
  44. // 所以 Math.min(maxSize, blockSize) = 32Mb, Math.max(minSize, 32Mb) = 32Mb
  45. // 即:默认情况下,切片大小等于块大小
  46. long splitSize = computeSplitSize(blockSize, minSize, maxSize);
  47. // 进行切片,只有大于切片大小的 SPLIT_SLOP(本类中该值固定为1.1) 倍才切。
  48. // 例如,传入一个 66Mb 的文件:
  49. // 第一次进while时,66Mb > 35.2Mb(32Mb * 1.1), 会被切分一个 32Mb的片,剩余 34Mb
  50. // 第二次进while时, 34Mb < 35.2Mb (32Mb * 1.1), 那么剩下的 34Mb 就不再切分了,就当做1个片处理
  51. // 这样做的好处是:防止过度切分,导致最后一台服务器过于空闲,初始化时间比真正处理数据时间还长,拉低效率。
  52. // 例如上面剩下的 34Mb 如果继续切分成 32Mb 和 2 Mb的片,最后处理 2Mb 的那台机器真正处理的数据量太少,造成资源浪费、效率降低
  53. long bytesRemaining = length;
  54. while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
  55. int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  56. splits.add(makeSplit(path, length-bytesRemaining, splitSize,
  57. blkLocations[blkIndex].getHosts(),
  58. blkLocations[blkIndex].getCachedHosts()));
  59. bytesRemaining -= splitSize;
  60. }
  61. if (bytesRemaining != 0) {
  62. int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  63. splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
  64. blkLocations[blkIndex].getHosts(),
  65. blkLocations[blkIndex].getCachedHosts()));
  66. }
  67. } else { // not splitable
  68. // 对于不能切片的文件,直接设置成1个片
  69. if (LOG.isDebugEnabled()) {
  70. // Log only if the file is big enough to be splitted
  71. if (length > Math.min(file.getBlockSize(), minSize)) {
  72. LOG.debug("File is not splittable so no parallelization "
  73. + "is possible: " + file.getPath());
  74. }
  75. }
  76. splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
  77. blkLocations[0].getCachedHosts()));
  78. }
  79. } else {
  80. //Create empty hosts array for zero length files
  81. splits.add(makeSplit(path, 0, length, new String[0]));
  82. }
  83. }
  84. // Save the number of input files for metrics/loadgen
  85. job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
  86. sw.stop();
  87. if (LOG.isDebugEnabled()) {
  88. LOG.debug("Total # of splits generated by getSplits: " + splits.size()
  89. + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
  90. }
  91. return splits;

writeNewSplits方法执行完input.getSplits(job)切片后,会调用将切片信息写入stag文件夹中:JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);

InputSplit只是记录了切片的元数据信息,比如切片的起始位置、长度、所在的节点列表等,并不是真的进行了切片。提交切片规划文件到Yarn上,Yarn上的MRAppMaster就可以根据切片规划文件计算开启MapTask个数。

9、修改切片的大小

根据切片的源码可知,切片的大小为:

  1. long splitSize = Math.max(minSize, Math.min(maxSize, blockSize));

其中,blockSize为数据块大小,一般由磁盘读写速度决定,平时不会随便更改。
所以,如果要修改切片大小,可以通过调整minSize、maxSize进行修改:

  • 如果想要调整的切片大小,大于数据块的大小,就需要调整minSize的大小。max(minSize, min(Long.MAX_VALUE, blockSize))时就可以取到minSize
  • 如果想要调整的切片大小,小于数据块的大小,就需要调整maxSize的大小。max(1, min(maxSize, blockSize))时 就可以取到maxSize
  • minSize可以通过mapred-site.xml的mapreduce.input.fileinputformat.split.minsize配置(默认在mapred-default.xml中配置为0);
  • maxSize可以通过mapred-site.xml的mapreduce.input.fileinputformat.split.maxsize配置(默认在mapred-default.xml中没有配置);

10、TextInputFormat切片机制

FileInputFormat常见的实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat、自定义InputFormat等。
TextInputFormat是FileInputFormat的默认实现类
按行读取每条记录。
key是存储在该行在整个文件中的起始字节偏移量(即该行第一个字符在整个文件中的位置),LongWritable类型。
value是这行的内容,不包括任何终止符(换行符和回车符),Text类型
c08b60e9e18db476526f3eab815377ab.png

11、详细的执行过程

参考文档:MapReduce阶段基础知识(详细)
MapReduce工作流程图(一):
image.png
流程:

  1. 客户端准备待处理文本
  2. 客户端在真正submit()前,规划好需要切分的片,在stage文件夹生成切片信息、job信息、jar包等文件
  3. 将stage中的切片信息文件、job信息、jar包提交给Yarn
  4. Yarn根据切片信息,进行切片划分MapTask
  5. MapTask中首先由InputFormat处理文件(默认是FileInputFormat的实现类TextInputFormat),如果isSplit方法判断结果允许分片,则创建RecorderReader读取文件并处理成为 key-value形式(TextInputFormat创建的是LineRecordReader,处理结果中 key 为偏移量,value为这一行的内容)。将处理的 key-value 传递给Mapper作为输入
  6. Mapper对传入的 key-value 进行逻辑运算,并将结果通过context.write(key, value)输出给outputCollector
  7. Mapper的 key-value 结果会写到内存中的环形缓冲区中。因为计算结果直接输出到磁盘时,效率比较低下,所以设计了一个缓冲区。先将结果存入缓冲区,达到比例时才溢出(spill)写到磁盘上,溢写的时候根据 key 进行排序(默认根据key字典序排序)。 在写入环形缓冲区之前,会先按照一定的规则对Mapper输出的键值对进行分区(partition)。默认情况下,只有一个ReduceTask,所以不分区。最后分区的数量就是ReduceTask的数量。 环形缓冲区默认100Mb,分为两部分,一部分用来写索引(记录key和value的位置、分区信息等元数据),一部分用来写真实的数据记录。 当环形缓冲区中存储的数据达到 80% 时,就开始逆着向磁盘中溢写:找到索引和数据的中间位置,倒序着向磁盘中写入。此时因为缓冲区还剩20%,所以即使有新数据进来也可以正常写入到内存缓冲区。 当环形缓冲区达到100%时,因为末尾的数据刚刚已经溢写到了磁盘,所以数据可以反方向的向环中写入,覆盖掉已经写入磁盘中的数据。如果内存中的数据写入过多,将要覆盖掉还没写入磁盘的数据时,程序就会进入等待,等这部分数据被写入磁盘后,内存中新数据才能将这部分旧数据覆盖。

    Map输出的数据会写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。

    环形缓冲区其实是一个数组,数组中存放着key、value的序列化数据和key、value的元数据信息,包括partition、key的起始位置、value的起始位置以及value的长度。环形结构是一个抽象概念。

    缓冲区是有大小限制,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size spill percent = 100MB 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。

  8. 存入的数据会记录分区信息,当数据达到 80% 时,在溢写前会先进行排序,使用的是快排算法,按照字典排序,排序时是修改的索引元数据中的keystart、valuestart,而不是直接移动 key-value 。溢写后产生spill.index索引文件和 spill.out数据文件

  9. 数据每达到一次 80%,就产生一次溢写文件,最后就会产生大量的溢写文件。 每次溢写只产生一个溢写文件,虽然有多个分区,但是这些分区数据都存储在这一个溢写文件中,只是会把它们在一个文件中分隔开。
  10. 对第9步产生的大量溢写文件进行合并(merge)和排序。 因为第9步的文件中,每个文件在第8步时都单独进行了排序,所以每个文件内部都是有序的。所以再对这些有序文件进行排序时,就可以使用归并排序算法。 合并后的文件依然是按不同分区分隔的,所以排序时也是按照分区进行排序的,不同分区并没有进行整体排序。
  11. 在写入Reduce前会有预聚合,对分区内的相同 key 进行一次预聚合,方便后面发送给下一步。但是预聚合有前提条件,不是每次都能预聚合,例如:{a: 1, a: 1, b: 1}被合并为:{a:2, b:1}

接上图,MapReduce工作流程图(二):
image.png
流程:接上面的步骤

  1. MapTask任务完成后,启动对应数量的ReduceTask处理。 对于数据量较少的文件,一般都会等所有的MapTask都完成时才启动ReduceTask。 但是对于MapTask特别多时,可以配置推测执行等策略,在部分MapTask工作完成后就进行部分Reduce合并
  2. ReduceTask主动从MapTask拉取自己指定分区的数据(不是MapTask推送给ReduceTask)。 每个MapTask的指定分区内数据是有序的,但是ReduceTask会对应多个MapTask,所以还需要对该ReduceTask拿到的所有MapTask指定分区的数据进行合并和归并排序。
  3. 因为13步进行了归并排序,所以可以从前向后遍历所有的 key-value, 如果 key 和前一个相同,就接着获取下一个 key ,直到出现不同的 key ,然后将前面的这些相同 key 的值作为一个集合,连同 key 一起将这一组内容传入reduce()方法。在reduce()方法中执行对应的业务逻辑
  4. 还可以进行分组操作
  5. 将reduce()方法的结果写成数据文件,使用的是OutputFormat(默认使用的TextOutputFormat)

    12、Partition分区

    将统计结果按照条件输出到不同文件中(分区),比如将MapReduce结果按照手机号前3位输出到不同的文件中。
    分区的个数决定了会产生多少个ReduceTask,也决定了最后生成的结果文件。

当不配置ReduceTask个数时,默认只有1个ReduceTask,也就只有1个分区。此时走的分区类是Hadoop的一个内部类,其分区方法getPartition会固定返回一个0,即最后所有的结果都生成到0号文件(part-r-00000)中。

当配置了ReduceTask个数大于1,但是没有指定分区类时,Hadoop 默认使用的分区类是HashPartitioner,分区方式是 hash分区 :

将key取 hash 值,然后对ReduceTask个数取余。key.hashcode() % numReduceTask(每个分区都会产生一个ReduceTask,所以ReduceTask个数就是分区个数)

例如,设置ReduceTask个数为2,最后生成的结果文件中,就会按照 key 的 hash 将结果存入2个结果文件中:

  1. // 设置ReduceTask个数为2,
  2. // 每个ReduceTask产生一个结果文件,最后就会产生2个结果文件:0号文件(part-r-00000)、1号文件(part-r-00001)
  3. // 按照key的hash值,对2取余,结果为0的存入0号文件,结果为1的存入1号文件
  4. job.setNumReduceTasks(2);

用户可以自定义分区类Partitioner:

  1. 自定义类,继承Partitioner,实现getPartition()分区方法
  2. 在 job 驱动类中,设置使用自定义的Partitioner:

    1. job.setPartitionerClass(MyPartitioner.class);
  3. 根据自定义Partitioner的逻辑,设置相应数量的ReduceTask:

自定义Partitioner:

  1. package com.study.mapreduce.partition;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Partitioner;
  5. /**
  6. * 根据手机号前3位分区
  7. * Map输出的key,value
  8. */
  9. public class MyPartitioner extends Partitioner<Text, IntWritable> {
  10. @Override
  11. public int getPartition(Text key, IntWritable value, int numPartitions) {
  12. String phoneStart = key.toString().substring(0, 3);
  13. int result = 0;
  14. // 分区号必须从0开始,逐一增加
  15. switch (phoneStart) {
  16. case "133":
  17. result = 0;
  18. break;
  19. case "139":
  20. result = 1;
  21. break;
  22. case "192":
  23. result = 2;
  24. break;
  25. case "188":
  26. result = 3;
  27. break;
  28. default:
  29. break;
  30. }
  31. return result;
  32. }
  33. }

在Driver中配置使用该分区类,根据分区类中的逻辑设置ReduceTask个数:

  1. // 设置分区类、ReduceTask个数
  2. job.setPartitionerClass(MyPartitioner.class);
  3. job.setNumReduceTasks(4);

如果Partitioner中分区结果有 5 个,而设置的ReduceTask个数是 4 个(比分区个数少),程序运行时就可能出现IOException:Illegal partition for xxxx(4)。
如果设置的ReduceTask为1,那么就不会走我们的Partitioner,而是全部输出的0号文件(走的Hadoop默认的Partitioner内部类)。
如果Partitioner中分区结果有5个,但是设置的 ReduceTask 个数是 6 个,程序可以正常运行,最后会产生 6 个结果文件,且第6个结果文件是空的。这样最后分配的第6个ReduceTask被浪费了,分配了该节点但是没有处理数据。
总结:

  • 如果 ReduceTask的数量 >getPartition的结果数,程序可以正常运行,但是会产生几个空的输出文件,最后几个分配的节点没有处理数据,空耗资源;
  • 如果 1 < ReduceTask的数量 <getPartition的结果数,则有部分数据没有ReduceTask处理,会抛出IOException;
  • 如果 ReduceTask的数量 = 1,则不管有没有设置自定义分区类,最终走的都是Hadoop默认的一个固定返回0的分区类,只会分配一个ReduceTask,也只会产生一个结果文件;
  • 如果ReduceTask的数量 = 0,则表示不进行Reduce汇总,Hadoop也不会再进行Shuffle,直接将MapTask的结果输出到文件
  • 分区getPartition方法中,分区号必须从0开始,逐1增加

    五、Yarn

    Yarn 是一个通用的资源管理系统调度平台,可以为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。
    虽然 Yarn 属于Hadoop的一部分,但是Yarn不仅仅能运行 MapReduce,还可以运行 Tez、HBase、Spark、Flink等等程序,理论上支持各种计算程序。Yarn不关心你做的什么,只负责管理资源(内存 和 CPU)。

    1、yarn架构

    Yarn由 ResourceManager、NodeManager、ApplicationMaster 和 Container 等组件构成。
    image.png

ResourceManager、NodeManager 是集群物理层面的组件,在Hadoop集群搭建过程中需要明确分配Resourcemanager是哪台服务器、NodeManager是哪台服务器。
ApplicationMaster 简称 AppMaster。在 MapReduce程序中的 MRAppMaster 就是 AppMaster的一个具体实现。属于 App 应用层面,Yarn不关心该组件,而是由具体的应用程序(MapReduce、Spark等)去具体实现。
ResourceManager、NodeManager、AppMaster 合称 Yarn的三大组件。
Container容器:是硬件资源的抽象,多个程序之间就可以隔离运行
简单来说,ResourceManager是整个集群的管理者,NodeManager是单个服务器的管理者,ApplicationMaster是每个任务作业的管理者。

2、详细流程图

Yarn执行流程图:
image.png
作业提交过程

  1. Client调用job.waitForCompletion()方法,创建一个YarnRunner(本地模式是LocalRunner)向Yarn集群的ResourceManager申请提交作业运行一个Application
  2. ResourceManager返回给Client一个提交资源的路径、一个Application的id,用于本次Application运行
  3. Client将运行job所需要的资源(jar包、切片、job参数xml等)提交到指定的路径
  4. Client资源提交完毕,申请运行MRAppMaster

ResourceManager处理

  1. ResourceManager收到用户的请求后,初始化为一个Task。 因为Yarn是一个调度平台,ResourceManager可能要同时接收多个不同的应用程序发来的 job,所以ResourceManager会将这个Task放到一个调度队列中

AppMaster处理

  1. 比较空闲的NodeManager到ResourceManager上领取任务,拿到Task
  2. 在该NodeManager上创建一个Container容器,并分配 CPU 和 内存,在该容器中启动MRAppMaster进程
  3. MRAppMaster进程到集群资源路径上,将 job 的资源下载到本地
  4. MRAppMaster根据 job 资源中的切片信息,向ResourceManager申请开启MapTask容器运行MapTask

MapTask处理

  1. MRAppMaster向ResourceManager申请运行MapTask后,其他空闲的NodeManager向ResourceManager领取任务创建容器,分配 cpu内存,下载 jar 包。 因为MapTask是以容器为单位运行的,一个NodeManager上可以运行多个容器。所以可能出现 切片时切分了2个MapTask,结果这两个MapTask都以容器的形式在同一个NodeManager上运行。
  2. MRAppMaster向MapTask发送程序启动脚本,启动这些MapTask。这时这些运行MapTask的NodeManager上就会创建YarnChild进程进行运行。 MapTask会向MRAppMaster汇报自己的进度。当运行完成时,将结果持久化到磁盘,并通知MRAppMaster。

AppMaster处理

  1. MRAppMaster监控到MapTask运行完毕之后,会向ResourceManager申请开启ReduceTask容器运行ReduceTask

ReduceTask处理

  1. 与MapTask类似,NodeManager领取到ReduceTask任务,创建容器,分配cpu内存。然后从Map节点获取相应分区的结果数据。 ReduceTask容器运行的进程也叫做YarnChild。 ReduceTask向MRAppMaster汇报进度。当运行完成时,通知MRAppMaster

AppMaster处理

  1. 在所有ReduceTask都运行完成时,AppMaster向ResourceManager申请注销AppMaster和前面的MapTask、ReduceTask。

3、调度器

调度器(Resource Scheduler)是属于ResourceManager内部的一个组件,当有多个 job 提交过来的时候,使用调度器进行调度。
在Yarn中,负责给应用程序分配资源的就是Scheduler,它是ResourceMananger的核心组件之一。
Scheduler完全专用于调度作业,它无法跟踪应用程序的状态。

Hadoop作业调度器主要有三种:先进先出调度器(FIFO)、容量调度器(Capacity Scheduler)、公平调度器(Fair Scheduler)。

Apache的Hadoop 3.1.3 默认的资源调度器是 容量调度器(Capacity Scheduler)
可以通过配置文件配置:yarn-site.xml

  1. <property>
  2. <description>The class to use as the resource scheduler.</description>
  3. <name>yarn.resourcemanager.scheduler.class</name>
  4. <!-- 默认使用的 Capacity Scheduler -->
  5. <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
  6. </property>

3.1 先进先出调度器(FIFO)

FIFO调度器(First In First Out):单队列,根据提交作业的先后顺序,先来先服务。调度工作不考虑优先级和范围,适用于负载较低的小规模集群。当使用大型共享集群时,它的效率较低且会导致一些问题。
FIFO Scheduler拥有一个控制全局的队列queue,默认queue名称为default,该调度器会获取当前集群上所有的资源信息作用与这个全局的queue。
优点:无需配置,先到先得,易于执行。
缺点:任务的优先级不会变高,因此高优先级的作业也需要等待,不适合共享集群。
FIFO是 Hadoop 1.x 中的JobTracker原有的调度器实现,在Yarn中保留了下来。实际生产中用到大数据的场景都是高并发的,所以一般不会采用这种调度器
image.png

3.2 容量调度器(Capacity Scheduler)

Capacity Scheduler 是 Yahoo 开发的多用户调度器,是Apache版的 Hadoop 3.x 的默认调度策略。
该策略允许多个组织共享整个集群资源,每个组织可以获得集群的一部分计算能力。通过为每个组织分配专门的队列,然后再为每个队列分配一定的集群资源,这样整个集群就可以通过设置多个队列的方式给多个组织提供服务了。
image.png

3.3 公平调度器(Fair Scheduler)

Fair Scheduler,公平调度器,是 Facebook 开发的多用户调度器。 提供了Yarn应用程序公平地共享大型集群中资源的另一种形式。使所有应用在平均情况下随着时间的流式可以获得相等的资源份额。
Fair Scheduler设计目标是为所有的应用分配公平的资源(对公平的定义通过参数来设置)。
公平调度可以在多个丢列建工作,允许资源共享和抢占。
image.png