MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析应用”的核心框架。
MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上。

image.png
(1) 分布式的运算程序往往需要分成至少 2 个阶段。
(2) 第一个阶段的 MapTask 并发实例,完全并行运行,互不相干。
(3) 第二个阶段的 ReduceTask 并发实例互不相干,但是他们的数据依赖于上一个阶段的所有 MapTask 并发实例的输出。
(4) MapReduce 编程模型只能包含一个 Map 阶段和一个 Reduce 阶段,如果用户的业务逻辑非常复杂,那就只能多个 MapReduce 程序,串行运行。
总结: 分析 WordCount 数据流走向深入理解 MapReduce 核心思想。

MapReduce 进程
一个完整的 MapReduce 程序在分布式运行时有三类实例进程:
(1) MrAppMaster:负责整个程序的过程调度及状态协调。
(2) MapTask:负责 Map 阶段的整个数据处理流程。
(3) ReduceTask:负责 Reduce 阶段的整个数据处理流程。

一、数据序列化类型

  1. Java 类型 Hadoop Writable 类型
  2. Boolean BooleanWritable
  3. Byte ByteWritable
  4. Int IntWritable
  5. Float FloatWritable
  6. Long LongWritable
  7. Double DoubleWritable
  8. String Text
  9. Map MapWritable
  10. Array ArrayWritable
  11. Null NullWritable

二、MapReduce编程规范

用户编写的程序分成三个部分: Mapper、 Reducer 和 Driver。

1、Mapper阶段

  • 用户自定义Mapper继承父类
  • Mapper输入数据是KV对
  • Mapper的业务逻辑写在map()方法
  • Mapper的输出数据是KV对
  • map()方法(MapTask进程)对每一个KV调用一次

    2、Reducer阶段

  • 用户自定义的Reducer继承父类

  • Reducer的输入数据类型对应Mapper的输出数据类型
  • Reducer的业务逻辑写在reduce()方法中
  • ReduceTask进程对每一组相同k的kv调用一次reduce()

    3、Driver阶段

    Yarn集群的客户端,用于提交程序到Yarn。提交的是:封装了MapReduce程序相关运行参数的job对象

三、Hadoop序列化

序列化概述
1) 什么是序列化
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
2)为什么不用 Java 的序列化
Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息, Header,继承体系等),不便于在网络中高效传输。所以,Hadoop 自己开发了一套序列化机制(Writable)。
Hadoop 序列化特点:
(1) 紧凑 : 高效使用存储空间。
(2)快速: 读写数据的额外开销小。
(3)互操作: 支持多语言的交互

3)自定义 bean 对象实现序列化接口(Writable)

  1. 在企业开发中往往常用的基本序列化类型不能满足所有需求, 比如在 Hadoop 框架内部
  2. 传递一个 bean 对象, 那么该对象就需要实现序列化接口。
  3. 具体实现 bean 对象序列化步骤如下 7 步。
  4. 1 必须实现 Writable 接口
  5. 2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造
  6. public FlowBean() {
  7. super();
  8. }
  9. 3)重写序列化方法
  10. @Override
  11. public void write(DataOutput out) throws IOException {
  12. out.writeLong(upFlow);
  13. out.writeLong(downFlow);
  14. out.writeLong(sumFlow);
  15. }
  16. 4)重写反序列化方法
  17. @Override
  18. public void readFields(DataInput in) throws IOException {
  19. upFlow = in.readLong();
  20. downFlow = in.readLong();
  21. sumFlow = in.readLong();
  22. }
  23. 5)注意反序列化的顺序和序列化的顺序完全一致
  24. 6)要想把结果显示在文件中,需要重写 toString(),可用"\t"分开,方便后续用。
  25. 7)如果需要将自定义的 bean 放在 key 中传输,则还需要实现 Comparable 接口,因为MapReduce 框中的 Shuffle 过程要求对 key 必须能排序。 详见后面排序案例。
  26. @Override
  27. public int compareTo(FlowBean o) {
  28. // 倒序排列,从大到小
  29. return this.sumFlow > o.getSumFlow() ? -1 : 1;
  30. }

4、MapReduce 框架原理
image.png

(1) InputFormat 数据输入

1. 切片与 MapTask 并行度决定机制

MapTask 的并行度决定 Map 阶段的任务处理并发度,进而影响到整个 Job 的处理速度。
思考: 1G 的数据, 启动 8 个 MapTask, 可以提高集群的并发处理能力。那么 1K 的数
据,也启动 8 个 MapTask,会提高集群性能吗? MapTask 并行任务是否越多越好呢? 哪些因
素影响了 MapTask 并行度?
MapTask 并行度决定机制 :
数据块: Block 是 HDFS 物理上把数据分成一块一块。 数据块是 HDFS 存储数据单位。
数据切片: 数据切片只是在逻辑上对输入进行分片, 并不会在磁盘上将其切分成片进行存储。数据切片是 MapReduce 程序计算输入数据的单位,一个切片会对应启动一个 MapTask。
image.png

2. Job 提交流程源码和切片源码详解
image.png
image.png

3. FileInputFormat 切片机制
image.png

image.png

4. TextInputFormat 切片机制

1) FileInputFormat 实现类
思考: 在运行 MapReduce 程序时,输入的文件格式包括:基于行的日志文件、二进制
格式文件、数据库表等。 那么,针对不同的数据类型, MapReduce 是如何读取这些数据的呢?
FileInputFormat 常见的接口实现类包括: TextInputFormat、 KeyValueTextInputFormat、
NLineInputFormat、 CombineTextInputFormat 和自定义 InputFormat 等。
2) TextInputFormat
TextInputFormat 是默认的 FileInputFormat 实现类。按行读取每条记录。 键是存储该行在
整个文件中的起始字节偏移量, LongWritable 类型。值是这行的内容,不包括任何行终止
符(换行符和回车符), Text 类型。

5. CombineTextInputFormat 切片机制

(2) MapReduce工作流程

image.png
image.png
上面的流程是整个 MapReduce 最全工作流程,但是 Shuffle 过程只是从第 7 步开始到第
16 步结束, 具体 Shuffle 过程详解, 如下:
(1) MapTask 收集我们的 map()方法输出的 kv 对,放到内存缓冲区中
(2) 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
(3) 多个溢出文件会被合并成大的溢出文件
(4) 在溢出过程及合并的过程中,都要调用 Partitioner 进行分区和针对 key 进行排序
(5) ReduceTask 根据自己的分区号,去各个 MapTask 机器上取相应的结果分区数据
(6) ReduceTask 会抓取到同一个分区的来自不同 MapTask 的结果文件, ReduceTask 会将这些文件再进行合并(归并排序)
(7) 合并成大文件后, Shuffle 的过程也就结束了,后面进入 ReduceTask 的逻辑运算过程(从文件中取出一个一个的键值对 Group,调用用户自定义的 reduce()方法)注意:(1) Shuffle 中的缓冲区大小会影响到 MapReduce 程序的执行效率,原则上说,缓冲区越大,磁盘 io 的次数越少,执行速度就越快。(2) 缓冲区的大小可以通过参数调整,参数: mapreduce.task.io.sort.mb 默认 100M。

(3) Shuffle 机制
image.png

  • Partition 分区
  • WritableComparable 排序
  • Combiner 合并

    (4) OutputFormat 数据输出

    (5)内核源码

    (6) Join应用

5、Hadoop压缩