第 1 章 MapReduce 概述

1.1 MapReduce 定义

MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析应用”的核心框架。
MapReduce 核心功能是将用户编写的业务逻辑代码自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上。
也即:MapReduce = 自己处理业务相关代码 + 框架自身默认代码

1.2 MapReduce 优缺点

1.2.1 优点

1)MapReduce 易于编程
用户只需要关心自身的业务逻辑,分布式计算相关的麻烦交互,由框架处理
2)良好的扩展性
当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。
3)高容错性
MapReduce 设计的初衷就是使程序能够部署在廉价的 PC 机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由 Hadoop 内部完成的。
4)适合 PB 级以上海量数据的离线处理
可以实现上千台服务器集群并发工作,提供数据处理能力。

1.2.2 缺点

1)不擅长实时计算
MapReduce 无法像 MySQL 一样,在毫秒或者秒级内返回结果。
2)不擅长流式计算 (Spark Stream 、Flink)
流式计算的输入数据是动态的,而 MapReduce 的输入数据集是静态的,不能动态变化。
这是因为 MapReduce 自身的设计特点决定了数据源必须是静态的。
3)不擅长 DAG(有向无环图)计算 (Spark 的 RDD 就是基于此的)
多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce 并不是不能做,而是使用后,每个 MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘 IO,导致性能非常的低下。
擅长 DAG 的基础是内存计算

1.3 MapReduce 核心思想

image.png
分布式的运算程序往往需要分成至少 2 个阶段。

  1. 第一个阶段的 MapTask 并发实例,完全并行运行,互不相干
  2. 第二个阶段的 ReduceTask 并发实例互不相干(指不同的输出),但是他们的数据依赖于上一个阶段的所有 MapTask 并发实例的输出

MapReduce 编程模型只能包含一个 Map 阶段和一个 Reduce 阶段
如果用户的业务逻辑非常复杂,那就只能多个 MapReduce 程序,串行运行。

  • 这样就是 DAG 运算了,每个 MR 都需要读取磁盘,写入磁盘,下一个又如此,导致了大量的磁盘IO,效率低下

总结:分析 WordCount 数据流走向深入理解 MapReduce 核心思想。

1.4 MapReduce 进程

一个完整的 MapReduce 程序在分布式运行时有三类实例进程:

  1. MrAppMaster:负责整个MR过程调度及状态协调。是 ApplicationMaster 的子类
  2. MapTask:负责 Map 阶段的整个数据处理流程。
  3. ReduceTask:负责 Reduce 阶段的整个数据处理流程。

    1.5 官方 WordCount 源码

    采用反编译工具反编译源码,发现 WordCount 案例有 Map 类、Reduce 类和 Driver 类。且数据的类型是 Hadoop 自身封装的序列化类型。
  • Map 类需要继承 Mapper 类,分别指定其 输入 和 输出 的泛型KV对,类型为 Hadoop 指定的实现了其特定序列化的数据类型
  • Reducer 类同理
  • Driver 类有成熟的书写套路

    1.6 常用数据序列化类型

    除了 String, 其他的都是加上 Writable 后缀即可
    image.png

    1.7 MapReduce 编程规范

    用户编写的程序分成三个部分:Mapper、Reducer 和 Driver。

    1.7.1 Mapper阶段

  1. 用户自定义的Mapper要继承自己的父类 org.apache.hadoop.mapreduce.Mapper
  2. Mapper 的输入数据是 <K, V> 对的形式(KV的类型可自定义)
    • key:常为偏移量,既每行的行号
    • value:每行的内容
  3. Mapper的输出数据是 <K, V> 对的形式(KV的类型可自定义)
  4. Mapper中的业务逻辑写在 map() 方法中
  5. **map()** 方法(MapTask进程)对 每一个调用一次

    1.7.2 Reducer阶段

    (1)用户自定义的Reducer要继承自己的父类 org.apache.hadoop.mapreduce.Reducer
    (2)Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是 <K, V>
    (3)Reducer 的业务逻辑写在 reduce() 方法中
    (4)ReduceTask 进程对每一组相同 K <K, V> 组调用一次 reduce() 方法

    • 比如 wordCount 程序,对于 hello,可能多个mapTask 统计出了多个 (hello,1),那输入进来后,就会被合并成一个 (hello,[1,1,2]),调用一次 reduce() 方法

      1.7.3 Driver阶段

      相当于YARN集群的客户端(Client),用于提交我们整个程序到YARN集群,提交的是封装了 MapReduce 程序相关运行参数的 job 对象

      1.8 WordCount 案例实操

      按照 MapReduce 编程规范,分别编写 Mapper,Reducer,Driver
      image.png

      1.8.1 项目准备

      (1)创建 maven 工程,MapReduceDemo ```xml org.apache.hadoop hadoop-client 3.1.3 junit junit 4.12 org.slf4j slf4j-log4j12 1.7.30
  1. 2)在 pom.xml 文件中添加如下依赖
  2. - hadoop 依赖
  3. - log4j1 对应的 Slf4j 的桥接依赖
  4. 3)在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。
  5. - 日志配置
  6. ```xml
  7. log4j.rootLogger=INFO, stdout
  8. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  9. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  10. log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
  11. log4j.appender.logfile=org.apache.log4j.FileAppender
  12. log4j.appender.logfile.File=target/spring.log
  13. log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
  14. log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

(4)创建包名:com.haniel.mapreduce.wordcount

1.8.2 编写程序

(1)编写 Mapper 类

  • 注意:Mapper 存在两个包

    • org.apache.hadoop.**mapreduce** 是 2.x 和 3.x 的,只负责 M & R
    • org.apache.hadoop.**mapred** 是 1.x 的,除了 M & R,还负责任务调度 ```java import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; // 这个是 2.x 和 3.x 的 /* KEYIN: map阶段输入的key的类型:为偏移量,使用 LongWritable VALUEIN: map阶段输入的value类型:为行内容,使用 Text KEYOUT:map阶段的输出的key的类型,为 单词,使用 Text VALUEOUT:map阶段的输出的value的类型,为 单词的计数,使用 IntWriteable / public class WordCountMapper extends Mapper{ // map 阶段的输出 // 每行都会执行一次map()方法,因此这个输出需要写到map()方法外面去 // 2022-03-11 不理解 Text k = new Text(); IntWritable v = new IntWritable(1);

    /**

    • key: KEYIN
    • value: VALUEOUT
    • context: 进行和系统的联络 */ @Override protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { // 1 获取一行,转成 java 的String,有更多的操作方法 String line = value.toString(); // 2 切割 String[] words = line.split(“ “); // 3 输出 for (String word : words) {
      1. k.set(word);
      2. context.write(k, v);
      } } } ``` (2)编写 Reducer 类
  • 注意:Reducer 存在两个包

    • org.apache.hadoop.**mapreduce** 是 2.x 和 3.x 的,只负责 M & R
    • org.apache.hadoop.**mapred** 是 1.x 的,除了 M & R,还负责任务调度 ```java import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /* KEYIN: reduce阶段输入的key的类型:为单词,使用 Text VALUEIN: reduce阶段输入的value类型:为map阶段单个map的单词出现次数,使用 IntWritable KEYOUT:reduce阶段的输出的key的类型,为 单词,使用 Text VALUEOUT:reduce阶段的输出的value的类型,为reduce阶段后单词的计数,使用 IntWriteable / public class WordCountReducer extends Reducer{ int sum; IntWritable v = new IntWritable();

    /**

    • key: KEYIN
    • values: 所有相同KEYIN的输入形成一个迭代器 */ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // 1 累加求和 sum = 0; for (IntWritable count : values) {
      1. sum += count.get();
      } // 2 输出 v.set(sum); context.write(key,v); } }
  1. 3)编写 Driver 驱动类
  2. - 注意导包的问题
  3. ```java
  4. import java.io.IOException;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.IntWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  12. public class WordCountDriver {
  13. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  14. // 1 获取配置信息以及获取 job 对象
  15. Configuration conf = new Configuration();
  16. Job job = Job.getInstance(conf);
  17. // 2 关联本 Driver 程序的class
  18. job.setJarByClass(WordCountDriver.class);
  19. // 3 关联 Mapper 和 Reducer 的class
  20. job.setMapperClass(WordCountMapper.class);
  21. job.setReducerClass(WordCountReducer.class);
  22. // 4 设置 Mapper 输出的 kv 类型
  23. job.setMapOutputKeyClass(Text.class);
  24. job.setMapOutputValueClass(IntWritable.class);
  25. // 5 设置最终输出 kv 类型
  26. job.setOutputKeyClass(Text.class);
  27. job.setOutputValueClass(IntWritable.class);
  28. // 6 设置输入和输出路径
  29. FileInputFormat.setInputPaths(job, new Path(args[0]));
  30. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  31. // 7 提交 job
  32. boolean result = job.waitForCompletion(true);
  33. System.exit(result ? 0 : 1);
  34. }
  35. }

1.8.3 本地测试

(1)需要首先配置好 HADOOP_HOME 变量以及 Windows 运行依赖
(2)在 IDEA/Eclipse 上运行程序

1.8.4 提交到集群测试(重点)

(1)用 maven 打 jar 包,需要添加的打包插件依赖

  • maven-assembly-plugin 插件是将项目所有的依赖都打进一个jar中
    • 当对应的集群有项目需要的依赖时,其实是不用把这些依赖打进jar包的
    • 但是当集群没有这些依赖时,且jar包中没有对应的依赖,此时MR就会失败
    • 因此一般建议把项目中所有用到的依赖统统打进jar包中 —> fatjar
  • 注意:如果工程上显示红叉。在项目上右键->maven->Reimport 刷新即可

    1. <build>
    2. <plugins>
    3. <plugin>
    4. <artifactId>maven-compiler-plugin</artifactId>
    5. <version>3.6.1</version>
    6. <configuration>
    7. <source>1.8</source>
    8. <target>1.8</target>
    9. </configuration>
    10. </plugin>
    11. <plugin>
    12. <artifactId>maven-assembly-plugin</artifactId>
    13. <configuration>
    14. <descriptorRefs>
    15. <descriptorRef>jar-with-dependencies</descriptorRef>
    16. </descriptorRefs>
    17. </configuration>
    18. <executions>
    19. <execution>
    20. <id>make-assembly</id>
    21. <phase>package</phase>
    22. <goals>
    23. <goal>single</goal>
    24. </goals>
    25. </execution>
    26. </executions>
    27. </plugin>
    28. </plugins>
    29. </build>

    (2)将程序打成 jar 包
    (3)修改不带依赖的 jar 包名称为 wc.jar,并拷贝该 jar 包到 Hadoop 集群的 **/opt/module/hadoop-3.1.3** 路径。

  • 简单的 wordCount 程序使用到的 Hadoop 依赖在集群中有,junit 用不上,log可以不打印,因此无需使用带依赖的jar包

(4)启动 Hadoop 集群

  1. sbin/start-dfs.sh
  2. sbin/start-yarn.sh

(5)执行 WordCount 程序

  • hadoop jar jar包路径 入口类路径
  • 动态的输入和输出路径:Driver中使用了 args[0] 和 args[1] 分别作为输入和输出,因此这里Driver 类路径后面的参数就是 输入和输出参数了
    1. hadoop jar wc.jar com.haniel.mapreduce.wordcount.WordCountDriver /user/haniel/input /user/haniel/output

    第 2 章 Hadoop 序列化

    2.1 序列化概述

    image.png
    0)序列化的意义
  • 活的对象从一台机器的内存,传送到另一台机器的内存中

1)什么是序列化

  • 序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
  • 反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

2)为什么要序列化

  • 一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。
  • 然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机

3)为什么不用 Java 的序列化

  • Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。
  • 所以,**

4)Hadoop 序列化特点:
(1)紧凑 :高效使用存储空间。
(2)快速:读写数据的额外开销小。
(3)互操作:支持多语言的交互

2.2 自定义 bean 对象实现序列化接口(Writable)

在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在 Hadoop 框架内部传递一个 bean 对象,那么该对象就需要实现序列化接口。
具体实现 bean 对象序列化步骤如下 7 步。

  1. 必须实现 Writable 接口
  2. 反序列化时,需要反射调用空参构造函数,所以必须有空参构造

    1. public FlowBean() {
    2. super();
    3. }
  3. 重写序列化方法

    1. @Override
    2. public void write(DataOutput out) throws IOException {
    3. out.writeLong(upFlow);
    4. out.writeLong(downFlow);
    5. out.writeLong(sumFlow);
    6. }
  4. 重写反序列化方法

    1. @Override
    2. public void readFields(DataInput in) throws IOException {
    3. upFlow = in.readLong();
    4. downFlow = in.readLong();
    5. sumFlow = in.readLong();
    6. }
  5. 注意反序列化的顺序和序列化的顺序完全一致

  6. 要想把结果显示在文件中,需要重写 **toString()**,可用”\t”分开,方便后续用
    • 比如 MR 的结果含有这个 bean,且需要将 bean 的输出到文件时,就需要重写 toString,否则调用 toString 方法打印的是一个地址值
    • 其实这个和 Hadoop 的序列化没啥关系,就是对象打印的常识
  7. 如果需要将自定义的 bean 放在 key 中传输,则还需要实现 Comparable 接口,因为 MapReduce 框中的 Shuffle 过程要求对 key 必须能排序。详见后面排序案例。
    • 如果是放在 value 中,则不需要
      1. @Override
      2. public int compareTo(FlowBean o) {
      3. // 倒序排列,从大到小
      4. return this.sumFlow > o.getSumFlow() ? -1 : 1;
      5. }

      2.3 序列化案例实操

      image.png
      输入信息
      image.png
      phone_data.txt

      2.3.1 编写 MapReduce 程序

      编写流量统计的 Bean 对象
  • 接入 Hadoop 框架指定的序列化方式,实现 Writable 接口

    1. import org.apache.hadoop.io.Writable;
    2. import java.io.DataInput;
    3. import java.io.DataOutput;
    4. import java.io.IOException;
    5. //1 继承 Writable 接口
    6. public class FlowBean implements Writable {
    7. private long upFlow; //上行流量
    8. private long downFlow; //下行流量
    9. private long sumFlow; //总流量
    10. //2 提供无参构造
    11. public FlowBean() {
    12. }
    13. //3 提供三个参数的 getter 和 setter 方法
    14. public long getUpFlow() {
    15. return upFlow;
    16. }
    17. public void setUpFlow(long upFlow) {
    18. this.upFlow = upFlow;
    19. }
    20. public long getDownFlow() {
    21. return downFlow;
    22. }
    23. public void setDownFlow(long downFlow) {
    24. this.downFlow = downFlow;
    25. }
    26. public long getSumFlow() {
    27. return sumFlow;
    28. }
    29. public void setSumFlow(long sumFlow) {
    30. this.sumFlow = sumFlow;
    31. }
    32. public void setSumFlow() {
    33. this.sumFlow = this.upFlow + this.downFlow;
    34. }
    35. //4 实现序列化和反序列化方法,注意顺序一定要保持一致
    36. @Override
    37. public void write(DataOutput dataOutput) throws IOException {
    38. dataOutput.writeLong(upFlow);
    39. dataOutput.writeLong(downFlow);
    40. dataOutput.writeLong(sumFlow);
    41. }
    42. @Override
    43. public void readFields(DataInput dataInput) throws IOException {
    44. this.upFlow = dataInput.readLong();
    45. this.downFlow = dataInput.readLong();
    46. this.sumFlow = dataInput.readLong();
    47. }
    48. //5 重写 toString
    49. @Override
    50. public String toString() {
    51. return upFlow + "\t" + downFlow + "\t" + sumFlow;
    52. }
    53. }

    编写 Mapper 类

  • 输入为:行号(偏移值),行内容

  • 输出为:手机号,FlowBean对象

    • 对象需要在多台机器间流转(MR),因此必须要序列化,而这用的Hadoop,所以用Hadoop的序列化方式

      1. import org.apache.hadoop.io.LongWritable;
      2. import org.apache.hadoop.io.Text;
      3. import org.apache.hadoop.mapreduce.Mapper;
      4. import java.io.IOException;
      5. public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
      6. private Text outK = new Text();
      7. private FlowBean outV = new FlowBean();
      8. @Override
      9. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      10. //1 获取一行数据,转成字符串
      11. String line = value.toString();
      12. //2 切割数据
      13. String[] split = line.split("\t");
      14. //3 抓取我们需要的数据:手机号,上行流量,下行流量
      15. String phone = split[1];
      16. String up = split[split.length - 3];
      17. String down = split[split.length - 2];
      18. //4 封装 outK outV
      19. outK.set(phone);
      20. outV.setUpFlow(Long.parseLong(up));
      21. outV.setDownFlow(Long.parseLong(down));
      22. outV.setSumFlow();
      23. //5 写出 outK outV
      24. context.write(outK, outV);
      25. }
      26. }

      编写 Reducer 类

  • 输入:手机号,FlowBean对象

  • 输出:手机号,FlowBean对象

    1. import org.apache.hadoop.io.Text;
    2. import org.apache.hadoop.mapreduce.Reducer;
    3. import java.io.IOException;
    4. public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
    5. private FlowBean outV = new FlowBean();
    6. @Override
    7. protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
    8. long totalUp = 0;
    9. long totalDown = 0;
    10. //1 遍历 values,将其中的上行流量,下行流量分别累加
    11. for (FlowBean flowBean : values) {
    12. totalUp += flowBean.getUpFlow();
    13. totalDown += flowBean.getDownFlow();
    14. }
    15. //2 封装 outKV
    16. outV.setUpFlow(totalUp);
    17. outV.setDownFlow(totalDown);
    18. outV.setSumFlow();
    19. //3 写出 outK outV
    20. context.write(key,outV);
    21. }
    22. }

    编写 Driver 驱动类

    1. import org.apache.hadoop.conf.Configuration;
    2. import org.apache.hadoop.fs.Path;
    3. import org.apache.hadoop.io.Text;
    4. import org.apache.hadoop.mapreduce.Job;
    5. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    6. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    7. import java.io.IOException;
    8. public class FlowDriver {
    9. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    10. //1 获取 job 对象
    11. Configuration conf = new Configuration();
    12. Job job = Job.getInstance(conf);
    13. //2 关联本 Driver 类
    14. job.setJarByClass(FlowDriver.class);
    15. //3 关联 Mapper 和 Reducer
    16. job.setMapperClass(FlowMapper.class);
    17. job.setReducerClass(FlowReducer.class);
    18. //4 设置 Map 端输出 KV 类型
    19. job.setMapOutputKeyClass(Text.class);
    20. job.setMapOutputValueClass(FlowBean.class);
    21. //5 设置程序最终输出的 KV 类型
    22. job.setOutputKeyClass(Text.class);
    23. job.setOutputValueClass(FlowBean.class);
    24. //6 设置程序的输入输出路径
    25. FileInputFormat.setInputPaths(job, new Path("D:\\inputflow"));
    26. FileOutputFormat.setOutputPath(job, new Path("D:\\flowoutput"));
    27. //7 提交 Job
    28. boolean b = job.waitForCompletion(true);
    29. System.exit(b ? 0 : 1);
    30. }
    31. }